[GitHub] flink issue #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOptTable,...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/4681 @fhueske Thanks for your advice. I already update the pr based on your suggestion. ---
[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140462913 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { + + /** +* Creates a copy of this Flink RelOptTable with new Flink Table and new row type. +* +* @param newTablenew flink table +* @param typeFactory type factory to create new row type of new flink table +* @return The copy of this Flink RelOptTable with new Flink table and new row type +*/ + def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): FlinkRelOptTable = { +val newRowType = newTable.getRowType(typeFactory) +FlinkRelOptTable.create(schema, newRowType, names, newTable) + } + + /** +* Extends a table with the given extra fields, which is not supported now. +* +* @param extendedFields --- End diff -- Do you mean add comment for extendedFields? ---
[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140462814 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { --- End diff -- done ---
[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140462752 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], --- End diff -- done ---
[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/4681 [FLINK-7636][Table API & SQL]Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor ## What is the purpose of the change There are two ways to fetch TableSource of TableSourceScan node (e.g FlinkLogicalTableSourceScan, PhysicalTableSourceScan and its subclass): 1. {code} val relOptTable: RelOptTable = getTable() val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]]) val tableSouce = tableSourceTable.tableSource {code} the result of getTable() is instance of RelOptTableImpl now, and it will not change after RelNode tree is built. 2. TableSourceScan node contains a tablesource as constructor parameter, so we could fetch the tablesource directly later. The returned tableSource is different with each other through above two ways after apply project push(PPD) down or filter push down(FPD). It is very confusing and will cause problems. The pr aims to fix the problem by introducing FlinkRelOptTable to replace RelOptTableImpl, and remove tableSource parameter from TableSourceScan's constructor. After PPD or FPD, a new FlinkRelOptTable instance which contains a new TableSourceTable will be passed to TableSourceScan constructor. ## Brief change log - *Adds FlinkRelOptTable to replace default RelOptTable implementation (RelOptTableImpl)* - *Adds FlinkCalciteCatalogReader, which is subclass of CalciteCatalogReader. It overrides getTable method to return FlinkRelOptTable instance instead of RelOptTableImpl instance* - *Removes tableSource parameter from TableSourceScan's constructor. A new FlinkRelOptTable instance which contains a new TableSourceTable will be passed to TableSourceScan constructor.* ## Verifying this change This change added tests and can be verified as follows: - *Added test that validates that FlinkRelOptTable instance is returned once call getTable method of FlinkCalciteCatalogReader* - *Other change is already covered by existing tests, such as (TableSourceTest, TableSourceITCase)* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/beyond1920/flink FLINK-7636 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4681.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4681 commit 7904bd6468f7ae482c4f906e52b29f477fe28602 Author: jingzhang <beyond1...@126.com> Date: 2017-09-18T10:01:24Z [FLINK-7636][Table API & SQL]Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor ---
[GitHub] flink issue #3409: [flink-5570] [Table API & SQL]Support register external c...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/3409 @fhueske @twalthr , thanks for the reviews. I've updated the pr based on your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3409#discussion_r107636567 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala --- @@ -511,15 +511,15 @@ case class Join( } case class CatalogNode( -tableName: String, -rowType: RelDataType) extends LeafNode { +rowType: RelDataType, +tablePath: String*) extends LeafNode { --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3409#discussion_r107636115 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -178,4 +178,32 @@ class BatchTableEnvironment( registerTableFunctionInternal[T](name, tf) } + + /** +* Scans a table from registered temporary tables and registered catalogs. +* +* The table to scan must be registered in the TableEnvironment or +* must exist in registered catalog in the TableEnvironment. +* +* Example: +* +* to scan a registered temporary table +* {{{ +* val tab: Table = tableEnv.scan("tableName") +* }}} +* +* to scan a table from a registered catalog +* {{{ +* val tab: Table = tableEnv.scan("catalogName.dbName.tableName") +* }}} +* +* @param tablePath The path of the table to scan. +* @throws TableException if no table is found using the given table path. +* @return The resulting [[Table]]. +*/ + @throws[TableException] + def scan(tablePath: String): Table = { --- End diff -- good idea, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3409#discussion_r107061507 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala --- @@ -511,15 +511,15 @@ case class Join( } case class CatalogNode( -tableName: String, -rowType: RelDataType) extends LeafNode { +rowType: RelDataType, +tablePath: String*) extends LeafNode { --- End diff -- use arrays instead of var-args in internal classes is a convention? I could do it in this way, however, var-args is more convenient to use than arrays in some cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3409#discussion_r107060990 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala --- @@ -165,3 +165,31 @@ case class AmbiguousTableSourceConverterException( def this(tableType: String) = this(tableType, null) } + +/** + * Exception for operation on a nonexistent external catalog + * + * @param catalogName external catalog name + * @param cause + */ +case class ExternalCatalogNotExistException( --- End diff -- hi, @twalthr , do you means that we could introduce two superclass, which is AlreadyExistException and NotExistException, other exceptions are subclass of these two exception class? IMO, there is no need to introduce a superclass, because each exception represents a different unexpected situation that user may cares, they may want to know whether database or table or something else does not exist yet. what's your opinion? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3409#discussion_r107059962 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -178,4 +178,32 @@ class BatchTableEnvironment( registerTableFunctionInternal[T](name, tf) } + + /** +* Scans a table from registered temporary tables and registered catalogs. +* +* The table to scan must be registered in the TableEnvironment or +* must exist in registered catalog in the TableEnvironment. +* +* Example: +* +* to scan a registered temporary table +* {{{ +* val tab: Table = tableEnv.scan("tableName") +* }}} +* +* to scan a table from a registered catalog +* {{{ +* val tab: Table = tableEnv.scan("catalogName.dbName.tableName") +* }}} +* +* @param tablePath The path of the table to scan. +* @throws TableException if no table is found using the given table path. +* @return The resulting [[Table]]. +*/ + @throws[TableException] + def scan(tablePath: String): Table = { --- End diff -- hi, @twalthr, there already existed scan method which is `def scan(tableName: String)`, I added a scan method which is `def scan(catalogName: String, dbName: String, tableName: String)` in first commit. Fabian suggest that we could extend the previous scan(String) to accept varargs parameters. And We would need to push the implementation to the Scala / Java versions of BatchTableEnvironment and StreamTableEnvironment because varargs are handled differently by Scala and Java. In this way, we could keep the API more concise because we only need a single scan() method. I think it's a good idea, so I updated the pr in the second commit. Maybe I'm confused something there. Any advice? @fhueske, @twalthr . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3569: [flink-6036]Let catalog support partition
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/3569 [flink-6036]Let catalog support partition This pr aims to let catalog support partition. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink externalCatalog-with-partition Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3569.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3569 commit 3c81e0601ed81b76a1adabb2f38ece46aa07e78e Author: jingzhang <beyond1...@126.com> Date: 2017-03-20T04:09:59Z Let catalog support partition --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3559: [flink-6037] [Table API & SQL]hotfix: metadata provider d...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/3559 @KurtYoung , thanks for the review. I've updated the pr based on your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3559: [flink-6037] [Table API & SQL]hotfix: metadata pro...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3559#discussion_r106797445 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala --- @@ -100,6 +101,7 @@ class FlinkPlannerImpl( assert(validatedSqlNode != null) val rexBuilder: RexBuilder = createRexBuilder val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) + cluster.setMetadataProvider(FlinkDefaultRelMetadataProvider.INSTANCE) --- End diff -- Because this logic is called in org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery, which like `RelMetadataQuery.THREAD_PROVIDERS.set( JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r106648264 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util.{HashMap => JHashMap, Map => JMap} +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.plan.stats.TableStats + +/** + * Table definition of the external catalog. + * + * @param identifier identifier of external catalog table, including dbName and tableName + * @param tableTypetype of external catalog table, e.g csv, hbase, kafka + * @param schema schema of table data, including column names and column types + * @param properties properties of external catalog table + * @param statsstatistics of external catalog table + * @param comment comment of external catalog table + * @param createTime create time of external catalog table + * @param lastAccessTime last access time of of external catalog table + */ +case class ExternalCatalogTable( +identifier: TableIdentifier, +tableType: String, +schema: DataSchema, --- End diff -- right, thanks for reminder me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3559: [flink-6037] [Table API & SQL]hotfix: metadata pro...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/3559 [flink-6037] [Table API & SQL]hotfix: metadata provider didn't work in SQL This pr aims to fix a bug referenced by https://issues.apache.org/jira/browse/FLINK-6037. After using the right MetadataProvider, org.apache.flink.table.ExpressionReductionTest.testReduceCalcExpressionForBatchSQL test cannot pass because the optimized plan is changed (The problem is referenced by https://issues.apache.org/jira/browse/FLINK-6067 which would be fixed in another pr). I simply changed test sql to make it pass in this pr. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3559.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3559 commit bd83041507b3f4fdea737b538fb39af4a249e6d2 Author: jingzhang <beyond1...@126.com> Date: 2017-03-17T09:47:35Z fix the bug: metadata provider didn't work in SQL --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/3406 @twalthr , thanks for the review. I have updated the pr based on your suggestion. I would add documents later in the pr #3409. About the name of ExternalCatalog, I notice that there are three kinds of catalog at flink now, the first one is calcite catalog, the second one is function catalog, the third one is the external catalog. I add 'external' prefix to distinguish them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r106404460 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.annotation.TableType +import org.apache.flink.table.catalog.{ExternalCatalogTable, TableSourceConverter} + +import scala.collection.JavaConverters._ +import java.util.{Set => JSet} + +import com.google.common.collect.ImmutableSet + +/** + * The class defines a converter used to convert [[CsvTableSource]] to + * or from [[ExternalCatalogTable]]. + */ +@TableType(value = "csv") +class CsvTableSourceConverter extends TableSourceConverter[CsvTableSource] { + + private val required: JSet[String] = ImmutableSet.of("path", "fieldDelim", "rowDelim") + + override def requiredProperties: JSet[String] = required + + override def fromExternalCatalogTable( + externalCatalogTable: ExternalCatalogTable): CsvTableSource = { +val params = externalCatalogTable.properties.asScala +val csvTableSourceBuilder = new CsvTableSource.Builder + +params.get("path").foreach(csvTableSourceBuilder.path) +params.get("fieldDelim").foreach(csvTableSourceBuilder.fieldDelimiter) +params.get("rowDelim").foreach(csvTableSourceBuilder.lineDelimiter) +params.get("quoteCharacter").foreach(quoteStr => + if (quoteStr.length != 1) { +throw new IllegalArgumentException("the value of param quoteCharacter is invalid") --- End diff -- Ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r106403723 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.lang.reflect.Modifier +import java.net.URL + +import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration} +import org.apache.flink.table.annotation.TableType +import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException} +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.TableSource +import org.apache.flink.util.InstantiationUtil +import org.reflections.Reflections +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * The utility class is used to convert ExternalCatalogTable to TableSourceTable. + */ +object ExternalTableSourceUtil { + + // config file to specify scan package to search TableSourceConverter + private val tableSourceConverterConfigFileName = "tableSourceConverter.properties" + + private val LOG: Logger = LoggerFactory.getLogger(this.getClass) + + // registered table type with the TableSourceConverter. + // Key is table type name, Value is set of converter class. + private val tableTypeToTableSourceConvertersClazz = { +val registeredConverters = + new mutable.HashMap[String, mutable.Set[Class[_ <: TableSourceConverter[_ + with mutable.MultiMap[String, Class[_ <: TableSourceConverter[_]]] +// scan all config files to find TableSourceConverters which are annotationed with TableType. +val resourceUrls = getClass.getClassLoader.getResources(tableSourceConverterConfigFileName) +while (resourceUrls.hasMoreElements) { + val url = resourceUrls.nextElement() + val scanPackages = parseScanPackagesFromConfigFile(url) + scanPackages.foreach(scanPackage => { +val clazzWithAnnotations = new Reflections(scanPackage) +.getTypesAnnotatedWith(classOf[TableType]) +clazzWithAnnotations.asScala.foreach(clazz => + if (classOf[TableSourceConverter[_]].isAssignableFrom(clazz)) { +if (Modifier.isAbstract(clazz.getModifiers()) || --- End diff -- Yes, I would add check on it based on InstantiationUtil. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r106395036 --- Diff: flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties --- @@ -0,0 +1,19 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + --- End diff -- Ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r106395005 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util.{HashMap => JHashMap, Map => JMap} +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.plan.stats.TableStats + +/** + * Table definition of the external catalog. + * + * @param identifier identifier of external catalog table, including dbName and tableName + * @param tableTypetype of external catalog table, e.g csv, hbase, kafka + * @param schema schema of table data, including column names and column types + * @param properties properties of external catalog table + * @param statsstatistics of external catalog table + * @param comment comment of external catalog table + * @param createTime create time of external catalog table + * @param lastAccessTime last access time of of external catalog table + */ +case class ExternalCatalogTable( +identifier: TableIdentifier, +tableType: String, +schema: DataSchema, --- End diff -- ColumnSchema sounds like a schema to describe a column, not the schema to describe a table. Maybe TableSchema is better, however there is already a class named TableSchema. So I use DataSchema. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r106394128 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import org.apache.flink.table.api._ + +/** + * This class is responsible for interact with external catalog. + * Its main responsibilities including: + * + * create/drop/alter database or tables for DDL operations + * provide tables for calcite catalog, it looks up databases or tables in the external catalog + * + */ +trait CrudExternalCatalog extends ExternalCatalog { + + /** +* Adds table into external Catalog +* +* @param table description of table which to create +* @param ignoreIfExists if table already exists in the catalog, not throw exception and leave +* the existed table if ignoreIfExists is true; +* else throw a TableAlreadyExistException. +* @throws DatabaseNotExistException if database does not exist in the catalog yet +* @throws TableAlreadyExistException if table already exists in the catalog and +*ignoreIfExists is false +*/ + @throws[DatabaseNotExistException] + @throws[TableAlreadyExistException] + def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit + + /** +* Deletes table from external Catalog +* +* @param dbNamedatabase name +* @param tableName table name +* @param ignoreIfNotExists if table not exist yet, not throw exception if ignoreIfNotExists is +* true; else throw TableNotExistException +* @throws DatabaseNotExistException if database does not exist in the catalog yet +* @throws TableNotExistExceptionif table does not exist in the catalog yet +*/ + @throws[DatabaseNotExistException] + @throws[TableNotExistException] + def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit --- End diff -- Take the usage into consideration, when we create database or alter database, we may need to specify other properties except for databaseNames, So the input parameters to createDatabase and alterDatabase is a ExternalCatalogDatabase instance which includes databaseName and other properties. However, when we drop a database, name is enough. Take ddl for example, CREATE DATABASE czech_slovak_names CHARACTER SET = 'keybcs2' COLLATE = 'keybcs2_bin'; Drop (DATABASE) [IF EXISTS] database_name; --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3409: [flink-5570] [Table API & SQL]Support register external c...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/3409 @fhueske , thanks for the review. I have updated the pr based on your suggestions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3409#discussion_r10636 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -353,19 +388,48 @@ abstract class TableEnvironment(val config: TableConfig) { * The table to scan must be registered in the [[TableEnvironment]]'s catalog. * * @param tableName The name of the table to scan. -* @throws ValidationException if no table is registered under the given name. +* @throws TableException if no table is registered under the given name. * @return The scanned table. */ @throws[ValidationException] def scan(tableName: String): Table = { if (isRegistered(tableName)) { - new Table(this, CatalogNode(tableName, getRowType(tableName))) + new Table(this, CatalogNode(getRowType(tableName), tableName)) } else { throw new TableException(s"Table \'$tableName\' was not found in the registry.") } } /** +* Scans a table from registered external catalog and returns the resulting [[Table]]. +* +* @param catalogName The name of the catalog to look-up for the table. +* @param dbName The database name of the table to scan. +* @param tableName The table name to scan. +* @throws ExternalCatalogNotExistException if no catalog is registered under the given name. +* @throws TableException if no database/ table is found in the given catalog. +* @return The scanned table. +*/ + @throws[ExternalCatalogNotExistException] + @throws[TableException] + def scan(catalogName: String, dbName: String, tableName: String): Table = { --- End diff -- Good suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3409#discussion_r106368629 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -246,6 +251,36 @@ abstract class TableEnvironment(val config: TableConfig) { } /** +* Registers a [[ExternalCatalog]] under a unique name in the TableEnvironment's catalog. +* The databases and tables in the registered external catalog can be referenced in SQL queries. +* +* @param nameThe name under which the externalCatalog will be registered. +* @param externalCatalog The externalCatalog to register. +*/ + def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = { +if (this.externalCatalogs.contains(name)) { --- End diff -- yes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3409#discussion_r106368614 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -246,6 +251,36 @@ abstract class TableEnvironment(val config: TableConfig) { } /** +* Registers a [[ExternalCatalog]] under a unique name in the TableEnvironment's catalog. +* The databases and tables in the registered external catalog can be referenced in SQL queries. +* +* @param nameThe name under which the externalCatalog will be registered. +* @param externalCatalog The externalCatalog to register. +*/ + def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = { +if (this.externalCatalogs.contains(name)) { + throw new ExternalCatalogAlreadyExistException(name) +} +this.externalCatalogs.put(name, externalCatalog) +// create an external catalog calicte schema, register it on the root schema +ExternalCatalogSchema.create(rootSchema, name, externalCatalog) + } + + /** +* Gets a registered [[ExternalCatalog]] by name +* +* @param name The name under which the externalCatalog was previous registered. +* @return the externalCatalog found by name +*/ + def getRegisteredExternalCatalog(name: String): ExternalCatalog = { --- End diff -- We may need to look up the registered catalog to call create/update/delete table or database method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3409#discussion_r106368344 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -99,6 +101,9 @@ abstract class TableEnvironment(val config: TableConfig) { // a counter for unique attribute names private val attrNameCntr: AtomicInteger = new AtomicInteger(0) + // registered external catalog names -> catalog + private val externalCatalogs = new HashMap[String, ExternalCatalog] --- End diff -- @fhueske , store catalog aims to get the registered catalog in the future. User may want to call create/update/delete table or database method of catalog when execute DDL in SQL or similar operation in TableAPI. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/3406 @fhueske , thanks for your review. I already updated the pr based on your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/3406 @fhueske , thanks for your review. I updated the pr based on your comments. Your suggestion about moving the annotation from TableSource to TableSourceConverter is good, I changed the pr in this way. About not use scanning at all but exactly specify the Converter classes. It can work, too. However, if somebody adds new tableSourceConverters , such as parquetTableSourceConverter or else, users need to change the code or config file to register new added Converters, right? However scanning method can finds all converters automatically. Let me know what's your opinion, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r105384191 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.io.IOException +import java.lang.reflect.Modifier +import java.net.URL +import java.util.Properties + +import org.apache.flink.table.annotation.ExternalCatalogCompatible +import org.apache.flink.table.api.ExternalCatalogTableTypeNotExistException +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.TableSource +import org.apache.flink.util.InstantiationUtil +import org.reflections.Reflections +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +/** + * The utility class is used to convert ExternalCatalogTable to TableSourceTable. + */ +object CatalogTableHelper { + + // config file to specifier the scan package to search tableSources + // which is compatible with external catalog. + private val tableSourceConfigFileName = "externalCatalogTable.properties" --- End diff -- fabian, register all needed Converter classes instead of use scanning can work, too. However if somebody adds new tableSourceConverters , such as parquetTableSourceConverter or else, users need to change the code or config file to including the new added Converter? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r105363822 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +/** + * This class is responsible for interact with external catalog. + * Its main responsibilities including: + * + * create/drop/alter database or tables for DDL operations + * provide tables for calcite catalog, it looks up databases or tables in the external catalog + * + */ +trait ExternalCatalog { + + /** +* Adds table into external Catalog +* +* @param table description of table which to create +* @param ignoreIfExists whether to ignore operation if table already exists +*/ + def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit + + /** +* Deletes table from external Catalog +* +* @param dbNamedatabase name +* @param tableName table name +* @param ignoreIfNotExists whether to ignore operation if table not exist yet +*/ + def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit + + /** +* Modifies an existing table in the external catalog +* +* @param table description of table which to modify +* @param ignoreIfNotExists whether to ignore operation if table not exist yet +*/ + def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit + + /** +* Gets table from external Catalog +* +* @param dbNamedatabase name +* @param tableName table name +* @return table +*/ + def getTable(dbName: String, tableName: String): ExternalCatalogTable --- End diff -- I would add more comments of the method. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r105363598 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +/** + * This class is responsible for interact with external catalog. + * Its main responsibilities including: + * + * create/drop/alter database or tables for DDL operations + * provide tables for calcite catalog, it looks up databases or tables in the external catalog + * + */ +trait ExternalCatalog { + + /** +* Adds table into external Catalog +* +* @param table description of table which to create +* @param ignoreIfExists whether to ignore operation if table already exists +*/ + def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit --- End diff -- Hi, fabian. Even for the integration with Calcite, getTable(), getDataBase(), and listTables() are sufficient, however an complete ExternalCatalog should be responsible for CRUD operations on db/table. For some readonly ExternalCatalog, we could choose to not supported createX, dropX, andalterX methods , for example, throw UnsupportedOperationException. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r105362031 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.io.IOException +import java.lang.reflect.Modifier +import java.net.URL +import java.util.Properties + +import org.apache.flink.table.annotation.ExternalCatalogCompatible +import org.apache.flink.table.api.ExternalCatalogTableTypeNotExistException +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.TableSource +import org.apache.flink.util.InstantiationUtil +import org.reflections.Reflections +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +/** + * The utility class is used to convert ExternalCatalogTable to TableSourceTable. + */ +object CatalogTableHelper { + + // config file to specifier the scan package to search tableSources + // which is compatible with external catalog. + private val tableSourceConfigFileName = "externalCatalogTable.properties" --- End diff -- Hi, fabian. I'm a little confused. Which method do you means? 1. we could add a pkgsToScan field to TableConfig, it's response of Users to specify the value of pkgsToScan field. such as tableConfig.setPkgsToScan("org.apache.flink.table.sources","org.apache.flink.streaming.connectors.kafka")? Then users should know exactly which module every needed converter belongs to. 2. We could add a converterConfigFileName in TableConfig, it's response of Users to specify the value of pkgsToScan field. such as tableConfig.setConverterConfigFileName("externalCatalogTable.properties")? The way is a little strange because the file name is fixed based on the convention, may not changed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r105362292 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +/** + * This class is responsible for interact with external catalog. + * Its main responsibilities including: + * + * create/drop/alter database or tables for DDL operations + * provide tables for calcite catalog, it looks up databases or tables in the external catalog + * + */ +trait ExternalCatalog { + + /** +* Adds table into external Catalog +* +* @param table description of table which to create +* @param ignoreIfExists whether to ignore operation if table already exists --- End diff -- Good advice, I would change the comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r105316858 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/ExternalCatalogCompatible.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.annotation; + +import org.apache.flink.annotation.Public; +import org.apache.flink.table.catalog.TableSourceConverter; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * A tableSource with this annotation represents it is compatible with external catalog, that is, + * an instance of this tableSource can be converted to or converted from external catalog table + * instance. + * The annotation contains the following information: + * + * external catalog table type name for this kind of tableSource + * external catalog table <-> tableSource converter class + * + */ +@Documented +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Public +public @interface ExternalCatalogCompatible { --- End diff -- This suggestion is pretty good, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/3406 @fhueske, thanks for your review. I changed the pr based on your suggestions, except for one point. About adding the version field to ExternalCatalogCompatible, could we define tableType is identifier, it includes version information. For example, kafka0.8/ kafka0.9 / kafka1.0 or hive1.0/ hive2.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r104401203 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -92,7 +92,12 @@ under the License. - + --- End diff -- thanks a lot, I would exclude com.google.code.findbugs.* which license is LGPL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r104401325 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/ExternalCatalogCompatible.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.annotation; + +import org.apache.flink.annotation.Public; +import org.apache.flink.table.catalog.TableSourceConverter; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * A tableSource with this annotation represents it is compatible with external catalog, that is, + * an instance of this tableSource can be converted to or converted from external catalog table + * instance. + * The annotation contains the following information: + * + * external catalog table type name for this kind of tableSource + * external catalog table <-> tableSource converter class + * + */ +@Documented +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Public +public @interface ExternalCatalogCompatible { --- End diff -- could we define tableType is identifier, it includes version information. For example, kafka0.8/ kafka0.9 / kafka1.0 or hive1.0/ hive2.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r104394065 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.schema._ +import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException} +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ + +/** + * This class is responsible for connect external catalog to calcite catalog. + * In this way, it is possible to look-up and access tables in SQL queries + * without registering tables in advance. + * The databases in the external catalog registers as calcite sub-Schemas of current schema. + * The tables in a given database registers as calcite tables + * of the [[ExternalCatalogDatabaseSchema]]. + * + * @param catalogIdentifier external catalog name + * @param catalog external catalog + */ +class ExternalCatalogSchema( +catalogIdentifier: String, +catalog: ExternalCatalog) extends Schema { + + private val LOG: Logger = LoggerFactory.getLogger(this.getClass) + + /** +* Looks up database by the given sub-schema name in the external catalog, +* returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name. +* +* @param name Sub-schema name +* @return Sub-schema with a given name, or null +*/ + override def getSubSchema(name: String): Schema = { +try { + val db = catalog.getDatabase(name) + if (db != null) { +new ExternalCatalogDatabaseSchema(db.dbName, catalog) + } else { +null + } +} catch { + case e: DatabaseNotExistException => +LOG.warn(s"database $name does not exist in externalCatalog $catalogIdentifier") +null +} + } + + /** +* Lists the databases of the external catalog, +* returns the lists as the names of this schema's sub-schemas. +* +* @return names of this schema's child schemas +*/ + override def getSubSchemaNames: util.Set[String] = catalog.listDatabases().toSet.asJava + + override def getTable(name: String): Table = null + + override def isMutable: Boolean = true + + override def getFunctions(name: String): util.Collection[Function] = +util.Collections.emptyList[Function] + + override def getExpression(parentSchema: SchemaPlus, name: String): Expression = +Schemas.subSchemaExpression(parentSchema, name, getClass) + + override def getFunctionNames: util.Set[String] = util.Collections.emptySet[String] + + override def getTableNames: util.Set[String] = util.Collections.emptySet[String] + + override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = false --- End diff -- Oh, yes, my bad. Would change it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r104371759 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -92,7 +92,12 @@ under the License. - + --- End diff -- fabian,about reflections jar, its license is WTFPL, WTFPL only says 'Everyone is permitted to copy and distribute verbatim or modified copies of this license document', however it does not say it compatible with ASL explicitly. Besides, I also want to include the class of this jar into the generated flink-table-{version}.jar, just like calcite class. I donât know whether it is allowed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r104343965 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala --- @@ -68,6 +62,90 @@ object CommonTestData { ) } + def getMockedFlinkExternalCatalog: ExternalCatalog = { +val csvRecord1 = Seq( + "1#1#Hi", + "2#2#Hello", + "3#2#Hello world" +) +val tempFilePath1 = writeToTempFile(csvRecord1.mkString("$"), "csv-test1", "tmp") +val externalCatalogTable1 = ExternalCatalogTable( + TableIdentifier("db1", "tb1"), + "csv", + DataSchema( +Array( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO), +Array("a", "b", "c") + ), + properties = Map( +"path" -> tempFilePath1, +"fieldDelim" -> "#", +"rowDelim" -> "$" + ) +) + +val csvRecord2 = Seq( + "1#1#0#Hallo#1", + "2#2#1#Hallo Welt#2", + "2#3#2#Hallo Welt wie#1", + "3#4#3#Hallo Welt wie gehts?#2", + "3#5#4#ABC#2", + "3#6#5#BCD#3", + "4#7#6#CDE#2", + "4#8#7#DEF#1", + "4#9#8#EFG#1", + "4#10#9#FGH#2", + "5#11#10#GHI#1", + "5#12#11#HIJ#3", + "5#13#12#IJK#3", + "5#14#13#JKL#2", + "5#15#14#KLM#2" +) +val tempFilePath2 = writeToTempFile(csvRecord2.mkString("$"), "csv-test2", "tmp") +val externalCatalogTable2 = ExternalCatalogTable( + TableIdentifier("db2", "tb2"), + "csv", + DataSchema( +Array( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO), +Array("d", "e", "f", "g", "h") + ), + properties = Map( +"path" -> tempFilePath2, +"fieldDelim" -> "#", +"rowDelim" -> "$" + ) +) +val catalog = mock(classOf[ExternalCatalog]) --- End diff -- Good idea --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r104343679 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala --- @@ -68,6 +62,90 @@ object CommonTestData { ) } + def getMockedFlinkExternalCatalog: ExternalCatalog = { +val csvRecord1 = Seq( --- End diff -- val csvRecord1 = Seq() is just csv data, maybe it's better to name it as csvDataRecords? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r104340593 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog + +import java.util + +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.schema._ +import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException} +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ + +/** + * This class is responsible for connect external catalog to calcite catalog. + * In this way, it is possible to look-up and access tables in SQL queries + * without registering tables in advance. + * The databases in the external catalog registers as calcite sub-Schemas of current schema. + * The tables in a given database registers as calcite tables + * of the [[ExternalCatalogDatabaseSchema]]. + * + * @param catalogIdentifier external catalog name + * @param catalog external catalog + */ +class ExternalCatalogSchema( +catalogIdentifier: String, +catalog: ExternalCatalog) extends Schema { + + private val LOG: Logger = LoggerFactory.getLogger(this.getClass) + + /** +* Looks up database by the given sub-schema name in the external catalog, +* returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name. +* +* @param name Sub-schema name +* @return Sub-schema with a given name, or null +*/ + override def getSubSchema(name: String): Schema = { +try { + val db = catalog.getDatabase(name) + if (db != null) { +new ExternalCatalogDatabaseSchema(db.dbName, catalog) + } else { +null + } +} catch { + case e: DatabaseNotExistException => +LOG.warn(s"database $name does not exist in externalCatalog $catalogIdentifier") +null +} + } + + /** +* Lists the databases of the external catalog, +* returns the lists as the names of this schema's sub-schemas. +* +* @return names of this schema's child schemas +*/ + override def getSubSchemaNames: util.Set[String] = catalog.listDatabases().toSet.asJava + + override def getTable(name: String): Table = null + + override def isMutable: Boolean = true + + override def getFunctions(name: String): util.Collection[Function] = +util.Collections.emptyList[Function] + + override def getExpression(parentSchema: SchemaPlus, name: String): Expression = +Schemas.subSchemaExpression(parentSchema, name, getClass) + + override def getFunctionNames: util.Set[String] = util.Collections.emptySet[String] + + override def getTableNames: util.Set[String] = util.Collections.emptySet[String] + + override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = false --- End diff -- set contentsHaveChangedSince to false because We want to fetch the latest informations from the underlying ExternalCatalog instead of using calcite caches. Because tables list and databases list of ExternalCatalog may changed anytime. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3406#discussion_r104333889 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -92,7 +92,12 @@ under the License. - + --- End diff -- Yes, this problem bothers me, too. the license of reflections jar is WTFPL, does that means we would do anything, how to check whether it is compatible with AL2? I noticed that the reflections jar is already referenced in flink-parent.pom, but only for tests, and would not be included in any flink jars. My original thought is not only use this jar, but also include it (like calcit) in the flink-table-{version}.jar. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/3409 [flink-5570] [Table API & SQL]Support register external catalog to table environment This pr aims to support register external catalog to TableEnvironment. The pr contains two commits, the first one is about https://issues.apache.org/jira/browse/FLINK-5568, it's content is as same as (https://github.com/apache/flink/pull/3406). The second commit is to support externalCatalog registration. So please focus on the second commit when you review this pr. The main changes in the second commit including: 1. add registerExternalCatalog method in TableEnvironment to register external catalog 2. add scan method in TableEnvironment to scan the table of the external catalog 3. add test cases for ExternalCatalog, including registration and scan You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5570 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3409.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3409 commit d0e1ab20078adc4f788e9c2d2c167f0251ae3476 Author: jingzhang <beyond1...@126.com> Date: 2017-02-22T11:28:08Z Introduce interface for external catalog, and provide an in-memory implementation for test or develop. Integrate with calcite catalog. commit 05e2b13847fab01e330d4bf2232886a793f7dd0c Author: jingzhang <beyond1...@126.com> Date: 2017-02-24T06:10:50Z Support register external catalog to table environment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/3406 [flink-5568] [Table API & SQL]Introduce interface for catalog, and provide an in-memory implementation. Integrate external catalog with calcite catalog This pr aims to introduce interface for catalog, and provide an in-memory implementation for test and develop, finally integrate external catalog with calcite catalog. The main change including: 1. Introduce ExternalCatalog abstraction, including introduce ExternalCatalogDatabase as database in catalog and ExternalCatalogTable as table in catalog. 2. Provide an in-memory implementation for test and develop. 3. Introduce ExternalCatalogSchema which is an implementation of Calcite Schema interface. It registers database in ExternalCatalog as calcite Schemas, and tables in a database as Calcite table. 4. Add ExternalCatalogCompatible annotation. The TableSource with this annotation represents it could be converted to or from externalCatalogTable. ExternalCatalogTableConverter is the converter between externalCatalogTable and tableSource. 5. Introduce CatalogTableHelper utility. It has two responsibilities: * automatically find the TableSources which are with ExternalCatalogCompatible annotation. * convert an ExternalCatalogTable instance to a TableSourceTable instance. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink dev Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3406.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3406 commit d0e1ab20078adc4f788e9c2d2c167f0251ae3476 Author: jingzhang <beyond1...@126.com> Date: 2017-02-22T11:28:08Z Introduce interface for external catalog, and provide an in-memory implementation for test or develop. Integrate with calcite catalog. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3197: [FLINK-5567] [Table API & SQL]Introduce and migrate curre...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/3197 @fhueske , thanks for your review. I already modify code based on your advice and rebase the pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3196: [FLINK-5566] [Table API & SQL]Introduce structure to hold...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/3196 @fhueske , thanks for your review. I modify code based on your advice, including compatibility with Java and column stats field type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3196: [FLINK-5566] [Table API & SQL]Introduce structure ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3196#discussion_r100958745 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.stats + +/** + * column statistics + * + * @param ndv number of distinct values + * @param nullCount number of nulls + * @param avgLenaverage length of column values + * @param maxLenmax length of column values + * @param max max value of column values + * @param min min value of column values + */ +case class ColumnStats( +ndv: Long, +nullCount: Long, +avgLen: Long, +maxLen: Long, +max: Option[Any], +min: Option[Any]) { --- End diff -- It makes sense to add a field to denote whether stats are precise or approximate and a field to hold timestamp when the stats were generated. But I'm not sure how these two fields effects the optimized plan. Because we prefer to use the provided stats, even it is estimated value or it is a little stale. So I didn't add these two fields currently, maybe will add them later. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3196: [FLINK-5566] [Table API & SQL]Introduce structure ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3196#discussion_r100951579 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.stats + +/** + * column statistics + * + * @param ndv number of distinct values + * @param nullCount number of nulls + * @param avgLenaverage length of column values + * @param maxLenmax length of column values + * @param max max value of column values + * @param min min value of column values + */ +case class ColumnStats( +ndv: Long, +nullCount: Long, +avgLen: Long, --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3196: [FLINK-5566] [Table API & SQL]Introduce structure ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3196#discussion_r100950729 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.stats + +/** + * column statistics + * + * @param ndv number of distinct values + * @param nullCount number of nulls + * @param avgLenaverage length of column values + * @param maxLenmax length of column values + * @param max max value of column values + * @param min min value of column values + */ +case class ColumnStats( +ndv: Long, --- End diff -- @fhueske , there is no need to make all stats optional. If there is no statistics for ndv/nullcount/avgLen/maxLen, we could give them an invalid value, e.g, -1. But it does not work for max/min, because max/min value could be possible negative. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2791: [FLINK-4449] [cluster management] add Heartbeat manager i...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2791 @tillrohrmann , thanks your reply. However my partner @wangzhijiang999 had a different implementation on this jira before, he will open a new pr soon. So I would close this pr. Sorry for wasting your time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2791: [FLINK-4449] [cluster management] add Heartbeat ma...
Github user beyond1920 closed the pull request at: https://github.com/apache/flink/pull/2791 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3197: [FLINK-5567] [Table API & SQL]Introduce and migrat...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/3197 [FLINK-5567] [Table API & SQL]Introduce and migrate current table statistics to FlinkStatistics This pr includes two commits, the first commit is Related to [https://github.com/apache/flink/pull/3196](url), the second commit is to introduce and migrate current table statistics to FlinkStatistics. So please focus on second commit. The main changes including: 1. Introduce FlinkStatistic class, which is an implementation of Calcite Statistic. 2. Integrate FlinkStatistic with FlinkTable. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5567 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3197.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3197 commit cadc16eefb0e0a9002e536a48b4b9f6824b6ab23 Author: 槿ç <jinyu...@alibaba-inc.com> Date: 2017-01-24T06:34:01Z Introduce structure to hold table and column level statistics commit 56c51b0f8d7983b8593946f64ece2b4881f0d723 Author: 槿ç <jinyu...@alibaba-inc.com> Date: 2017-01-24T06:57:08Z ntroduce and migrate current table statistics to FlinkStatistics --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3196: [FLINK-5566] [Table API & SQL]Introduce structure ...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/3196 [FLINK-5566] [Table API & SQL]Introduce structure to hold table and column level statistics This pr aims to introduce structure to hold table and column level statistics. TableStatsï¼ Responsible for hold table level statistics ColumnStats: Responsible for hold column level statistics. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5566 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3196.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3196 commit cadc16eefb0e0a9002e536a48b4b9f6824b6ab23 Author: 槿ç <jinyu...@alibaba-inc.com> Date: 2017-01-24T06:34:01Z Introduce structure to hold table and column level statistics --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3058: [FLINK-5394] [Table API & SQL]the estimateRowCount...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3058#discussion_r96112195 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala --- @@ -71,6 +72,21 @@ class DataSetSort( ) } + override def estimateRowCount(metadata: RelMetadataQuery): Double = { +val inputRowCnt = metadata.getRowCount(this.getInput) +if (inputRowCnt == null) { + inputRowCnt +} else { + val rowCount = Math.max(inputRowCnt - limitStart, 0D) --- End diff -- Yes, that make sense. I already fix it. Thanks @fhueske . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3058: [FLINK-5394] [Table API & SQL]the estimateRowCount...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/3058#discussion_r95967156 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala --- @@ -71,6 +72,21 @@ class DataSetSort( ) } + override def estimateRowCount(metadata: RelMetadataQuery): Double = { +val inputRowCnt = metadata.getRowCount(this.getInput) +if (inputRowCnt == null) { + inputRowCnt +} else { + val rowCount = Math.max(inputRowCnt - limitStart, 0D) --- End diff -- Hmm, the estimation 0 only happens if inputRowCount <= start and (fetch is null or fetchValue<=0). I think this estimation is reasonable in this case. Besides, why choose return 1 instead of 0.1 or 0.01 or other values? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3058: [FLINK-5394] [Table API & SQL]the estimateRowCount...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/3058 [FLINK-5394] [Table API & SQL]the estimateRowCount method of DataSetCalc didn't work This pr aims to fix a bug which is referenced by https://issues.apache.org/jira/browse/FLINK-5394. The main changes including: 1. add FlinkRelMdRowCount and FlinkDefaultRelMetadataProvider to override getRowCount of some Flink RelNodes 2. add getRowCount method in DatasetSort to provide more accurate estimate You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5394 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3058.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3058 commit 8099920fb8759ed1068e7b8153816a7b63089e45 Author: beyond1920 <beyond1...@126.com> Date: 2016-12-29T07:52:17Z the estimateRowCount method of DataSetCalc didn't work now, fix it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2923: [FLINK-5220] [Table API & SQL] Flink SQL projection pushd...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2923 @fhueske , thanks for your review, I already modify code based on your advices. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2923: [FLINK-5220] [Table API & SQL] Flink SQL projectio...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2923#discussion_r91847990 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.batch + +import org.apache.flink.api.common.io.GenericInputFormat +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv} +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit.{Before, Test} +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class ProjectableTableSourceITCase(mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + private val tableName = "MyTable" + private var tableEnv: BatchTableEnvironment = null + + @Before + def initTableEnv(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +tableEnv = TableEnvironment.getTableEnvironment(env, config) +tableEnv.registerTableSource(tableName, new TestProjectableTableSource) + } + + @Test + def testTableAPI(): Unit = { +val results = tableEnv + .scan(tableName) + .where("amount < 4") + .select("id, name") + .collect() + +val expected = Seq( + "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16", + "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n") +TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + + @Test + def testSQL(): Unit = { +val results = tableEnv + .sql(s"select id, name from $tableName where amount < 4 ") + .collect() + +val expected = Seq( + "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16", + "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n") +TestBaseUtils.compareResultAsText(results.asJava, expected) + } + +} + +class TestProjectableTableSource( + fieldTypes: Array[TypeInformation[_]], + fieldNames: Array[String]) + extends BatchTableSource[Row] with ProjectableTableSource[Row] { + + def this() = this( +fieldTypes = Array( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO), +fieldNames = Array[String]("name", "id", "amount", "price") + ) + + /** Returns the data of the table as a [[org.apache.flink.api.java.DataSet]]. */ + override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = { +execEnv.createInput(new ProjectableInputFormat(33, fieldNames), getReturnType).setParallelism(1) --- End diff -- yes, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2923: [FLINK-5220] [Table API & SQL] Flink SQL projectio...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2923#discussion_r91847985 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.util + +import java.math.BigDecimal + +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.jdbc.JavaTypeFactoryImpl +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} +import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR} +import org.apache.calcite.rex.{RexBuilder, RexInputRef, RexProgram, RexProgramBuilder} +import org.apache.calcite.sql.fun.SqlStdOperatorTable + +import scala.collection.JavaConverters._ +import org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._ +import org.junit.{Assert, Before, Test} + +/** + * This class is responsible for testing RexProgramProjectExtractor + */ +class RexProgramProjectExtractorTest { + private var typeFactory: JavaTypeFactory = null + private var rexBuilder: RexBuilder = null + private var allFieldTypes: Seq[RelDataType] = null + private val allFieldNames = List("name", "id", "amount", "price") + + @Before + def setUp: Unit = { +typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT) +rexBuilder = new RexBuilder(typeFactory) +allFieldTypes = List(VARCHAR, BIGINT, INTEGER, DOUBLE).map(typeFactory.createSqlType(_)) + } + + @Test + def testExtractRefInputFields: Unit = { +val usedFields = extractRefInputFields(buildRexProgram) +Assert.assertArrayEquals(usedFields, Array(2, 1, 3)) + } + + @Test + def testRewriteRexProgram: Unit = { +val originRexProgram = buildRexProgram + Assert.assertTrue(extractExprStrList(originRexProgram).sameElements(Array( + "$0", + "$1", + "$2", + "$3", + "*($t2, $t3)", + "100", + "<($t4, $t5)", + "6", + ">($t2, $t7)", + "AND($t6, $t8)"))) +// use amount, id, price fields to create a new RexProgram +val usedFields = Array(2, 1, 3) +val types = usedFields.map(allFieldTypes(_)).toList.asJava +val names = usedFields.map(allFieldNames(_)).toList.asJava +val inputRowType = typeFactory.createStructType(types, names) +val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, usedFields, rexBuilder) +Assert.assertTrue(extractExprStrList(newRexProgram).sameElements(Array( + "$0", + "$1", + "$2", + "*($t0, $t2)", + "100", + "<($t3, $t4)", + "6", + ">($t0, $t6)", + "AND($t5, $t7)"))) + } + + private def buildRexProgram: RexProgram = { +val types = allFieldTypes.asJava +val names = allFieldNames.asJava +val inputRowType = typeFactory.createStructType(types, names) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) +val t0 = rexBuilder.makeInputRef(types.get(2), 2) +val t1 = rexBuilder.makeInputRef(types.get(1), 1) +val t2 = rexBuilder.makeInputRef(types.get(3), 3) +val t3 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2)) +val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L)) +val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L)) +// project: amount, id, amount * price +builder.addProject(t0, "amount") +builder.addProject(t1, "id") +
[GitHub] flink pull request #2926: [FLINK-5226] [table] Use correct DataSetCostFactor...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2926#discussion_r91455098 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala --- @@ -73,7 +75,11 @@ class DataSetCalc( val child = this.getInput val rowCnt = metadata.getRowCount(child) -val exprCnt = calcProgram.getExprCount + +// compute number of non-field access expressions (computations, conditions, etc.) +// we only want to account for computations, not for simple projections. +val exprCnt = calcProgram.getExprList.asScala.toList.count(!_.isInstanceOf[RexInputRef]) --- End diff -- maybe we could also exclude RexLiteral as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2923: [FLINK-5220] [Table API & SQL] Flink SQL projectio...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2923#discussion_r91429660 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala --- @@ -45,7 +45,8 @@ abstract class BatchScan( override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { val rowCnt = metadata.getRowCount(this) -planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) +val columnCnt = getRowType.getFieldCount --- End diff -- Yes, that's right. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2923: [FLINK-5220] [Table API & SQL] Flink SQL projection pushd...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2923 @fhueske , thanks for your review. I have already modified the pr based on your advice, including following main changes: 1. Adapt the cost model for the BatchTableSourceScan 2. Modify DataSetCalcConverter -> RexProgramProjectExtractor, and add a test case Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2923: [FLINK-5220] [Table API & SQL] Flink SQL projectio...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/2923 [FLINK-5220] [Table API & SQL] Flink SQL projection pushdown This pr aims to do projection pushdown optimization. There are two commits here, first one is linked to [https://issues.apache.org/jira/browse/FLINK-5185](url), it is the pre work; second commit is merely about projection pushdown work. So it's maybe better to start with the second commit. The main changes including: 1. add PushProjectIntoBatchTableSourceScanRule to match DataSetCalc->BatchTableSourceScan 2. add ProjectableTableSource to represent a TableSource which supports Projection pushdown 3. change BatchScan cost compute logic 4. add a test case You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-5220 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2923.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2923 commit 6f4ecf2efaf424505e05c2d9142c90da24e12ed1 Author: beyond1920 <beyond1...@126.com> Date: 2016-11-29T04:26:02Z Decouple BatchTableSourceScan with TableSourceTable modify constructor of BatchScan, BatchTableSourceScan, DataSetScan Test Plan: junit Reviewers: kete.yangkt Differential Revision: http://phabricator.taobao.net/D6601 modify code style and extract common method let rule decide which tableSource to create a BatchTableScan Decouple BatchTableSourceScan with TableSourceTable make long length shorter to pass the flink code style check commit 181f7f7d4362799549f9ad3e7da2e69838c0f834 Author: beyond1920 <beyond1...@126.com> Date: 2016-12-02T03:33:12Z push project down into BatchTableSourceScan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2921: [FLINK-5185] [Table API & SQL] Decouple BatchTable...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2921#discussion_r90577312 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala --- @@ -32,18 +31,20 @@ class BatchTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, -rowType: RelDataType) - extends BatchScan(cluster, traitSet, table, rowType) { +val tableSource: BatchTableSource[_]) --- End diff -- @wuchong , Thanks for your review. For later use, the projection pushdown rule needs to get the actual tablesource of the BatchTableSourceScan, then do something on it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2921: [FLINK-5185] [Table API & SQL] Decouple BatchTable...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/2921 [FLINK-5185] [Table API & SQL] Decouple BatchTableSourceScan with TableSourceTable The pr aims to decouple BatchTableSourceScan with TableSourceTable for further optimization, e.g, projection pushdown/filter pushdown/query pushdown. The principle is let BatchTableSourceScan directly holding TableSource instead of holding TableSourceTable. The main changes including: 1. change constructors of BatchScan/BatchTableSourceScan/DataSetScan 2. Extract the logic logical of build row types based on fields types and fields name into FlinkTypeFactory class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-5185 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2921.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2921 commit d0718ff512a931cf0bb024df516a36215f486df5 Author: beyond1920 <beyond1...@126.com> Date: 2016-11-29T04:26:02Z modify constructor of BatchScan, BatchTableSourceScan, DataSetScan Test Plan: junit Reviewers: kete.yangkt Differential Revision: http://phabricator.taobao.net/D6601 commit 068b2ec7df9207bf246305da3dd744fcd339844a Author: beyond1920 <beyond1...@126.com> Date: 2016-11-30T01:21:42Z modify code style and extract common method commit fe0ac06e7a76663100a76ec91c8873de8def548b Author: beyond1920 <beyond1...@126.com> Date: 2016-11-30T03:21:36Z let rule decide which tableSource to create a BatchTableScan commit 1e4bebaaf2d0e23dc561d42feceecb4537336b57 Author: beyond1920 <beyond1...@126.com> Date: 2016-12-01T03:45:34Z Decouple BatchTableSourceScan with TableSourceTable commit 7d558b723a3241a63b1ecdcfb92f15b7676f71bc Author: beyond1920 <beyond1...@126.com> Date: 2016-12-01T07:36:42Z make long length shorter to pass the flink code style check --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2791: [FLINK-4449] [cluster management] add Heartbeat ma...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/2791 [FLINK-4449] [cluster management] add Heartbeat manager in ResourceManager with TaskExecutor This pr aims to add Heartbeat manager in ResourceManager with TaskExecutor. This pr focused on ResourceManager side, while #2770 focused on the TaskExecutor side. The main difference in this pr including: 1. Add heartbeatService which is responsible for heartbeat communication with TaskExecutor 2. Register heartbeatService to ResourceManager as a service when ResourceManagerRunner constructor a new ResourceManager instance 3. Add heartbeat RPC method in ResourceManagerGateway and TaskExecutorGateway, and implement the logic in the ResourceManager. 4. Change the TaskExecutorRegistrationSuccess because RM should tell TM its heart identifier. 5. Change some testcase in order to adapt to the modification. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-4449 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2791.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2791 commit 1326cc08988929263b31db8b4e8cf3d9107311d6 Author: beyond1920 <beyond1...@126.com> Date: 2016-10-31T09:50:15Z ResourceManager need a heartbeat manager to monitor the connections with all registered TaskExecutor, add heartbeatService in ResourceManager to monitor liveness of taskExecutors commit 3344e817ffa868e02778ea75ccd5c6fb40aa6670 Author: beyond1920 <beyond1...@126.com> Date: 2016-11-12T02:08:55Z add heartbeat logic from resourceManger to taskManager commit f3c432f569a642536bf03fadcbc81823d0b7ea94 Author: beyond1920 <beyond1...@126.com> Date: 2016-11-12T02:38:05Z add heartbeat mechinism between rm and tm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2524: [FLINK-4653] [Client] Refactor JobClientActor to a...
Github user beyond1920 closed the pull request at: https://github.com/apache/flink/pull/2524 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] when call connec...
Github user beyond1920 closed the pull request at: https://github.com/apache/flink/pull/2455 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2571 @mxm, What would happen under following cases: taskExecutor releases a registered slot, then taskExecutor reports its latest slotReport to ResourceManager, ResourceManager should remove the old slot allocation from its own view and mark the slot free. So I think we should keep the following code in the old SlotManager `updateSlotStatus`: _else { // slot is reported empty // check whether we also thought this slot is empty if (allocationMap.isAllocated(slotId)) { LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null", slotId, allocationMap.getAllocationID(slotId)); // we thought the slot is in use, correct it allocationMap.removeAllocation(slotId); // we have a free slot! handleFreeSlot(slot); } }_ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...
Github user beyond1920 closed the pull request at: https://github.com/apache/flink/pull/2540 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2543: [FLINK-4670] [cluster management] Add watch mechan...
Github user beyond1920 closed the pull request at: https://github.com/apache/flink/pull/2543 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2540#discussion_r80836325 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -324,6 +337,158 @@ public void handleError(final Exception exception) { shutDown(); } + /** +* Registers an infoMessage listener +* +* @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager +*/ + @RpcMethod + public void registerInfoMessageListener(final String infoMessageListenerAddress) { + if(infoMessageListeners.containsKey(infoMessageListenerAddress)) { + log.warn("Receive a duplicate registration from info message listener on ({})", infoMessageListenerAddress); + } else { + Future infoMessageListenerRpcGatewayFuture = getRpcService().connect(infoMessageListenerAddress, InfoMessageListenerRpcGateway.class); + + infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction() { + @Override + public void accept(InfoMessageListenerRpcGateway gateway) { + log.info("Receive a registration from info message listener on ({})", infoMessageListenerAddress); + infoMessageListeners.put(infoMessageListenerAddress, gateway); + } + }, getMainThreadExecutor()); + + infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { + @Override + public Void apply(Throwable failure) { + log.warn("Receive a registration from unreachable info message listener on ({})", infoMessageListenerAddress); + return null; + } + }, getMainThreadExecutor()); + } + } + + /** +* Unregisters an infoMessage listener +* +* @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager +* +*/ + @RpcMethod + public void unRegisterInfoMessageListener(final String infoMessageListenerAddress) { + infoMessageListeners.remove(infoMessageListenerAddress); + } + + /** +* Shutdowns cluster +* +* @param finalStatus +* @param optionalDiagnostics +*/ + @RpcMethod + public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) { + log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics); + shutDownApplication(finalStatus, optionalDiagnostics); + } + + /** +* This method should be called by the framework once it detects that a currently registered task executor has failed. +* +* @param resourceID Id of the worker that has failed. +* @param message An informational message that explains why the worker failed. +*/ + public void notifyWorkerFailed(final ResourceID resourceID, String message) { + runAsync(new Runnable() { + @Override + public void run() { + WorkerType worker = taskExecutorGateways.remove(resourceID); + if (worker != null) { + // TODO :: suggest failed task executor to stop itself + slotManager.notifyTaskManagerFailure(resourceID); + } + } + }); + } + + /** +* Gets the number of currently started TaskManagers. +* +* @return The number of currently started TaskManagers. +*/ + public int getNumberOfStartedTaskManagers() { + return taskExecutorGateways.size(); + } + + /** +* Notifies the resource manager of a fatal error. +* +* IMPORTANT: This should not cleanly shut down this master, but exit it in +* such a way that a high-availability setting would restart this or fail over +* to another master. +*/ + public void onFatalError(final String message, final Throwable error) { + runAsync(new Runnable() { + @Override + public void run() { +
[GitHub] flink issue #2540: [FLINK-4606] [cluster management] Integrate the new Resou...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2540 @mxm , thanks for your review, I modified the pr based on your advices: 1. fIx checkstyle error, `AkkaRpcActorTest` testcase and `RpcCompletenessTest` testcase. Sorry for those mistakes, I would take care of it next time. 2. About resourceManager, I adopt ResourceManager extends RpcEndpoint at first time, but it would fail because of an Exception when I wanna to start a subClass of this ResourceManager. For example, public class StandaloneResourceManager extends ResourceManager, when I start this ResourceManager, it would call AkkaRpcService.#startServer, an exception would be thrown here because selfGatewayType was mistake for TaskExecutorRegistration class. So I change it to ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2540#discussion_r80612002 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -66,15 +67,16 @@ * {@link #requestSlot(SlotRequest)} requests a slot from the resource manager * */ -public class ResourceManager extends RpcEndpoint implements LeaderContender { +public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender { --- End diff -- @mxm , I adopt `ResourceManager extends RpcEndpoint ` at first time, but it would fail because of an Exception when I wanna to start a subClass of this ResourceManager. For example, `public class StandaloneResourceManager extends ResourceManager`, when I start this ResourceManager, it would call AkkaRpcService.#startServer, an exception would be thrown here because selfGatewayType was mistake for TaskExecutorRegistration class. So I change it to `ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2524: [FLINK-4653] [Client] Refactor JobClientActor to adapt to...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2524 @mxm , thanks for your review. I modified the content based on your advice: 1. The changes could compile successfully now, sorry for the level mistake. 2. AwaitJobResult method in JobClientUtils would retry upon TimeoutException utils JobInfoTracker seems to be dead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2543: [FLINK-4670] [cluster management] Add watch mechanism on ...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2543 Hi, till. Thanks for your review. First sorry to bind to the wrong jira link, already correct it. About heartbeatManager, i thought we could separate HeartbeatManager into two part, each part focus only on one function. 1. Death watch to monitor liveness, it could register watch, unregister watch, and notify the dead targets. just like deathwatch in akka. Although now I implement it on Akka's dead watch functionality, but interface is defined. It's fine to give another implementation of it in future. 2. Payload deliver periodically. But because data sync behavior is different in different component, e.g, TM,JM,RM, so I'm not sure it is good to extract the logic into a common logic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2543: [FLINK-4606] [cluster management] Add watch mechan...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/2543 [FLINK-4606] [cluster management] Add watch mechanism on current RPC framework This pr aims at adding watch mechanism on current RPC framework, so RPC gateway could be watched to make sure the rpc server is running just like previous DeathWatch in akka. There are following main differences: 1. Add watch and unwatch method to RpcEndpoint class 2. Implement WatchOperationHandler in AkkaInvocationHandler, so it could handle watch and unwatch operation You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-4670 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2543.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2543 commit ed8e1b46b4cb20100fcf161ee9e2b4e4622aaf85 Author: beyond1920 <beyond1...@126.com> Date: 2016-09-23T00:46:30Z watch and unwatch mechanism in new rpc framework Summary: watch and unwatch mechanism in new rpc framework Test Plan: junit Reviewers: #blink, kete.yangkt Differential Revision: http://phabricator.taobao.net/D5849 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2540: [FLINK-4606] [Client] Integrate the new ResourceMa...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/2540 [FLINK-4606] [Client] Integrate the new ResourceManager with the existed FlinkResourceManager This pr aims to integrate the new ResourceManager with the existed FlinkResourceManager, the main difference including: 1. Move the useful rpc communication in existed FlinkResourceManager to new ResourceManager, e.g : register infoMessageListener, unregister infoMessageListener, shutDownCluster 2. Make ResourceManager to be an abstract class, extract framework specific behavior 3. Implement standalone resourceManager based on the new base ResourceManager class. 4. Modify testcases which are effected by abstract resourceManager class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-4606 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2540.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2540 commit d0e0cea949a8bd2e94c333b2204d90f6ed9e740d Author: beyond1920 <beyond1...@126.com> Date: 2016-09-09T01:11:24Z yarn slot manager commit 78a3b44040b73d288f67a7b3491ab6abaf673bdb Author: beyond1920 <beyond1...@126.com> Date: 2016-09-10T03:29:40Z integrate with existing FlinkResourceManager --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2455: [FLINK-4547] [cluster management] when call connect metho...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2455 @tillrohrmann , I rebase the PR. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user beyond1920 closed the pull request at: https://github.com/apache/flink/pull/2451 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2479: [FLINK-4537] [cluster management] ResourceManager ...
Github user beyond1920 closed the pull request at: https://github.com/apache/flink/pull/2479 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2524: [FLINK-4653] [Client] Refactor JobClientActor to adapt to...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2524 @mxm , I rebase the PR already. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2524: [FLINK-4653] [Client] Refactor JobClientActor to a...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/2524 [FLINK-4653] [Client] Refactor JobClientActor to adapt to new Rpc framework and new cluster managerment There are following main changes: 1. Create a RpcEndpoint(temporary named JobInfoTracker) and RpcGateway(temporary named JobInfoTrackerGateway) to replace the old JobClientActor. 2. Change rpc message communication in JobClientActor to rpc method call to apply to the new rpc framework. 3. JobInfoTracker is responsible for waiting for the jobStateChange and jobResult util job complete. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-4653 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2524.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2524 commit 5959e94604142784341ba76fb06cccd32473676b Author: beyond1920 <beyond1...@126.com> Date: 2016-09-21T06:00:14Z jobClient refactor Summary: jobClient refactor for apply to new cluster management Test Plan: junit Reviewers: #blink, kete.yangkt Differential Revision: http://phabricator.taobao.net/D5816 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77943543 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final Map<JobMasterGateway, InstanceID> jobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- There is a log field in RpcEndpoint, which is protected, why not use that instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2479: [FLINK-4537] [cluster management] ResourceManager ...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/2479 [FLINK-4537] [cluster management] ResourceManager registration with JobManager This pull request is to implement ResourceManager registration with JobManager, which including: 1. Check whether input resourceManagerLeaderId is as same as the current leadershipSessionId of resourceManager. If not, it means that maybe two or more resourceManager exists at the same time, and current resourceManager is not the proper rm. so it rejects or ignores the registration. 2. Check whether exists a valid JobMaster at the giving address by connecting to the address. Reject the registration from invalid address.(Hidden in the connect logic) 3. Keep JobID and JobMasterGateway mapping relationships. 4. Start a JobMasterLeaderListener at the given JobID to listen to the leadership of the specified JobMaster. 5. Send registration successful ack to the jobMaster. Main difference are 6 points: 1. Add getJobMasterLeaderRetriever method to get job master leader retriever in HighAvailabilityServices, NonHaServices, A inner class in TaskExecutor, TestingHighAvailabilityServices. 2. Change registerJobMaster method logic of ResourceManager based on the above step 3. Change the input parameters of registerJobMaster method in ResourceManager and ResourceManagerGateway class to be consistent with registerTaskExecutor, from jobMasterRegistration to resourceManagerLeaderId + jobMasterAddress + jobID 4. Change the result type of registerJobMaster method in ResourceManager and ResourceManagerGateway class to be consistent with RetryingRegistration, from org.apache.flink.runtime.resourcemanager.RegistrationResponse to org.apache.flink.runtime.registration.RegistrationResponse 5. Add a LeaderRetrievalListener in ResourceManager to listen to leadership of jobMaster 6. Add a test class for registerJobMaster method in ResourceManager You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-4537 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2479.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2479 commit fa66ac8ae86745dc9daf1fb07c6c96be4f336c90 Author: beyond1920 <beyond1...@126.com> Date: 2016-09-01T07:27:20Z rsourceManager registration with JobManager commit f5e54a21e4a864b5ac5f2f548b6d3dea3edcb619 Author: beyond1920 <beyond1...@126.com> Date: 2016-09-07T09:53:44Z Add JobMasterLeaderRetriverListener at ResourceManager --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2455: [FLINK-4547] [cluster management] when call connect metho...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2455 @tillrohrmann @StephanEwen , thanks for your review. I changed the code based on your comment, including two points: 1. Change the JIRA and the PR subject line to better reflect the actual changes. 2. Modify the testcase which connect to invalid address in AkkaRpcServiceTest. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77769044 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- There exists 3 following possibilities of the response from taskExecutor: 1. Ack request which means the taskExecutor gives the slot to the specified jobMaster as expected. 2. Decline request if the slot is already occupied by other AllocationID. 3. Timeout which could caused by lost of request message or response message or slow network transfer. On the first occasion, SlotManager need to do nothing. However, under the second and third occasion, slotManager will verify and clear all the previous allocate information for this slot request firstly, then try to find a proper slot for the slot request again. I thought we should add logic to handle these 3 following possibilities of the response from taskExecutor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77768256 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- ResourceManager keeps a relationship between resourceID and TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID using ResourceManager here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2451: [FLINK-4535] [cluster management] resourceManager process...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2451 @tillrohrmann , thanks for your review. I changed the pr based on your advice, including ï¼ 1. Modify UnmatchedLeaderSessionIDException to LeaderSessionIDException, format code style and method comment, and add checkNotNull to constructor 2. Format code style and method comment of ResourceManager 3. Group the TaskExecutorGateway and the InstanceID into a TaskExecutorRegistration 4. Modify testcases which are expected to throw exception at ResourceManagerTest Besides, I don't understand this comment of yours: "Should we fail or decline the registration here? So either sending an exception or a RegistrationResponse.Decline message.". Do you means you prefer to sending a Decline message? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2451#discussion_r77757943 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java --- @@ -138,22 +150,46 @@ public SlotAssignment requestSlot(SlotRequest slotRequest) { /** -* -* @param resourceManagerLeaderId The fencing token for the ResourceManager leader +* Register a taskExecutor at the resource manager +* @param resourceManagerLeaderId The fencing token for the ResourceManager leader * @param taskExecutorAddress The address of the TaskExecutor that registers * @param resourceID The resource ID of the TaskExecutor that registers * * @return The response by the ResourceManager. */ @RpcMethod - public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor( - UUID resourceManagerLeaderId, - String taskExecutorAddress, - ResourceID resourceID) { + public Future registerTaskExecutor( + final UUID resourceManagerLeaderId, + final String taskExecutorAddress, + final ResourceID resourceID) { + + if(!leaderSessionID.equals(resourceManagerLeaderId)) { + log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}", + resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId); + return Futures.failed(new UnmatchedLeaderSessionIDException(leaderSessionID, resourceManagerLeaderId)); --- End diff -- Do you prefer to send a Decline message under the condition? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] when call connec...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2455#discussion_r77754084 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java --- @@ -189,7 +191,49 @@ public void stop() { rpcEndpoint.tell(Processing.STOP, ActorRef.noSender()); } - // + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null) { + return false; + } + + if(Proxy.isProxyClass(o.getClass())) { + return o.equals(this); + } --- End diff -- @tillrohrmann , It does not means that AkkaInvocationHandler be a proxy class. In fact it means if input parameter class is a proxy class, then return o.equals(this) result. Here is my reason. when call connect method in AkkaRpcService, the returned gateway which is wrapped in Future is in fact a Proxy. When I call equals method to compare two gateway, the equals method of AkkaInvocationHandler will be called. But the input parameter is still another Gateway which class is Proxy class instead of AkkaInvocationHandler. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] Return same obje...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2455#discussion_r77753174 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java --- @@ -189,7 +191,49 @@ public void stop() { rpcEndpoint.tell(Processing.STOP, ActorRef.noSender()); } - // + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null) { + return false; + } + + if(Proxy.isProxyClass(o.getClass())) { + return o.equals(this); + } --- End diff -- @StephanEwen , as till said, the subject of this pr is misleading. I means When call connect method in AkkaRpcService using same address and same rpc gateway class, the returned gateways are equals instead of return same gateway. I changed the subject. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] Return same obje...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/2455 [FLINK-4547] [cluster management] Return same object when call connect method in AkkaRpcService using same address and same rpc gateway class The pull request is to add equals and hashCode method to AkkaInvocationHandler class. Now every time call connect method in AkkaRpcService class using same address and same rpc gateway class, the return gateway object is totally different with each other which equals and hashcode are not same. Maybe itâs reasonable to have the same result (equals return true, and hashcode is same) when using the same address and same Gateway class. Main difference are 2 points: 1. Add equals and hashCode method to AkkaInvocationHandler class 2. Add a test for connect method in AkkaRpcService You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-4547 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2455.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2455 commit 8504f1075b4c4c51f37aab2b45b864f885dc76a3 Author: beyond1920 <beyond1...@126.com> Date: 2016-09-01T07:00:03Z add equals and hashcode in AkkaInvocationHandler --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/2451 [FLINK-4535] [cluster management] resourceManager process the registration from TaskExecutor This pull request is to implement ResourceManager registration with TaskExecutor, which including: 1. Check whether input resourceManagerLeaderId is as same as the current leadershipSessionId of resourceManager. If not, it means that maybe two or more resourceManager exists at the same time, and current resourceManager is not the proper rm. so it rejects or ignores the registration. 2. Check whether exists a valid taskExecutor at the giving address by connecting to the address. Reject the registration from invalid address. (which is hidden in the connect method) 3. Keep resourceID and taskExecutorGateway mapping relationships, And optionally keep resourceID and container mapping relationships in yarn mode. 4. Send registration successful ack to the taskExecutor. Main difference are 3 points: 1. Add UnmatchedLeaderSessionIDException to specify that received leader session ID is not as same as expected. 2. Change registerTaskExecutor method of ResourceManager 3. Add a test class for ResourceManager You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-4535 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2451.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2451 commit fa795ca7a992859398ed30180e50ef036a93b355 Author: beyond1920 <beyond1...@126.com> Date: 2016-09-01T03:14:00Z resourceManager process the registration from TaskExecutor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...
Github user beyond1920 closed the pull request at: https://github.com/apache/flink/pull/2427 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2427: [FLINK-4516] [cluster management]leader election of resou...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2427 Hi @mxm , thanks so much. I already close the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2427: [FLINK-4516] [cluster management]leader election of resou...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2427 @mxm , Thanks for your review. And I change the ResourceManagerHATest to use a mocked RpcService instead of TestingSerialRpcService. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2435: [FLINK-4478] [flip-6] Add HeartbeatManager
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2435#discussion_r76938462 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.heartbeat; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import scala.concurrent.Future; + +/** + * Interface for the interaction with the {@link HeartbeatManager}. The heartbeat listener is used + * for the following things: + * + * + * Notifications about heartbeat timeouts + * Payload reports of incoming heartbeats + * Retrieval of payloads for outgoing heartbeats + * + * @param Type of the incoming payload + * @param Type of the outgoing payload + */ +public interface HeartbeatListener<I, O> { + + /** +* Callback which is called if a heartbeat for the machine identified by the given resource +* ID times out. +* +* @param resourceID Resource ID of the machine whose heartbeat has timed out +*/ + void notifyHeartbeatTimeout(ResourceID resourceID); --- End diff -- It seems that JobManager and ResourceManager don't have ResourceID, only TaskExecutor has ResourceID. Would it be more proper to use something else to identify the heartbeat target? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---