[GitHub] flink issue #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOptTable,...

2017-09-22 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/4681
  
@fhueske Thanks for your advice. I already update the pr based on your 
suggestion.


---


[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...

2017-09-22 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140462913
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
+
+  /**
+* Creates a copy of this Flink RelOptTable with new Flink Table and 
new row type.
+*
+* @param newTablenew flink table
+* @param typeFactory type factory to create new row type of new flink 
table
+* @return The copy of this Flink RelOptTable with new Flink table and 
new row type
+*/
+  def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): 
FlinkRelOptTable = {
+val newRowType = newTable.getRowType(typeFactory)
+FlinkRelOptTable.create(schema, newRowType, names, newTable)
+  }
+
+  /**
+* Extends a table with the given extra fields, which is not supported 
now.
+*
+* @param extendedFields
--- End diff --

Do you mean add comment for extendedFields?


---


[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...

2017-09-22 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140462814
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
--- End diff --

done


---


[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...

2017-09-22 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140462752
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
--- End diff --

done


---


[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...

2017-09-18 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/4681

[FLINK-7636][Table API & SQL]Introduce Flink RelOptTable, and remove 
tableSource from all TableSourceScan node constructor

## What is the purpose of the change

There are two ways to fetch TableSource of TableSourceScan node (e.g 
FlinkLogicalTableSourceScan, PhysicalTableSourceScan and its subclass):
1. 
{code}
val relOptTable: RelOptTable = getTable()
val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
val tableSouce = tableSourceTable.tableSource
{code}
the result of getTable() is instance of RelOptTableImpl now, and it will 
not change after RelNode tree is built.
2.  TableSourceScan node contains a tablesource as constructor parameter, 
so we could fetch the tablesource directly later.
 
The returned tableSource is different with each other through above two 
ways after apply project push(PPD) down or filter push down(FPD).  It is very 
confusing and will cause problems.

The pr aims to fix the problem by introducing FlinkRelOptTable to replace 
RelOptTableImpl, and remove tableSource parameter from TableSourceScan's 
constructor. After PPD or FPD,  a new FlinkRelOptTable instance which contains 
a new TableSourceTable will be passed to TableSourceScan constructor. 

## Brief change log

  - *Adds FlinkRelOptTable to replace default RelOptTable implementation 
(RelOptTableImpl)*
  - *Adds FlinkCalciteCatalogReader, which is subclass of  
CalciteCatalogReader. It overrides getTable method to return FlinkRelOptTable 
instance instead of RelOptTableImpl instance*
  - *Removes tableSource parameter from TableSourceScan's constructor. A 
new FlinkRelOptTable instance which contains a new TableSourceTable will be 
passed to TableSourceScan constructor.*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added test that validates that FlinkRelOptTable instance is returned 
once call getTable method of FlinkCalciteCatalogReader*
  - *Other change is already covered by existing tests, such as 
(TableSourceTest, TableSourceITCase)*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/beyond1920/flink FLINK-7636

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4681.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4681


commit 7904bd6468f7ae482c4f906e52b29f477fe28602
Author: jingzhang <beyond1...@126.com>
Date:   2017-09-18T10:01:24Z

[FLINK-7636][Table API & SQL]Introduce Flink RelOptTable, and remove 
tableSource from all TableSourceScan node constructor




---


[GitHub] flink issue #3409: [flink-5570] [Table API & SQL]Support register external c...

2017-03-23 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/3409
  
@fhueske @twalthr , thanks for the reviews. I've updated the pr based on 
your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-23 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107636567
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -511,15 +511,15 @@ case class Join(
 }
 
 case class CatalogNode(
-tableName: String,
-rowType: RelDataType) extends LeafNode {
+rowType: RelDataType,
+tablePath: String*) extends LeafNode {
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-23 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107636115
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,32 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Scans a table from registered temporary tables and registered 
catalogs.
+*
+* The table to scan must be registered in the TableEnvironment or
+* must exist in registered catalog in the TableEnvironment.
+*
+* Example:
+*
+* to scan a registered temporary table
+* {{{
+*   val tab: Table = tableEnv.scan("tableName")
+* }}}
+*
+* to scan a table from a registered catalog
+* {{{
+*   val tab: Table = tableEnv.scan("catalogName.dbName.tableName")
+* }}}
+*
+* @param tablePath The path of the table to scan.
+* @throws TableException if no table is found using the given table 
path.
+* @return The resulting [[Table]].
+*/
+  @throws[TableException]
+  def scan(tablePath: String): Table = {
--- End diff --

good idea, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-20 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107061507
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -511,15 +511,15 @@ case class Join(
 }
 
 case class CatalogNode(
-tableName: String,
-rowType: RelDataType) extends LeafNode {
+rowType: RelDataType,
+tablePath: String*) extends LeafNode {
--- End diff --

use arrays instead of var-args in internal classes is a convention? I could 
do it in this way, however,  var-args is more convenient to use than arrays in 
some cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-20 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107060990
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -165,3 +165,31 @@ case class AmbiguousTableSourceConverterException(
 
   def this(tableType: String) = this(tableType, null)
 }
+
+/**
+  * Exception for operation on a nonexistent external catalog
+  *
+  * @param catalogName external catalog name
+  * @param cause
+  */
+case class ExternalCatalogNotExistException(
--- End diff --

hi, @twalthr , do you means that we could introduce two superclass, which 
is AlreadyExistException and NotExistException, other exceptions are subclass 
of these two exception class? IMO, there is no need to introduce a superclass, 
because each exception represents a different unexpected situation that user 
may cares, they may want to know whether database or table or something else 
does not exist yet. what's your opinion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-20 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r107059962
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 ---
@@ -178,4 +178,32 @@ class BatchTableEnvironment(
 
 registerTableFunctionInternal[T](name, tf)
   }
+
+  /**
+* Scans a table from registered temporary tables and registered 
catalogs.
+*
+* The table to scan must be registered in the TableEnvironment or
+* must exist in registered catalog in the TableEnvironment.
+*
+* Example:
+*
+* to scan a registered temporary table
+* {{{
+*   val tab: Table = tableEnv.scan("tableName")
+* }}}
+*
+* to scan a table from a registered catalog
+* {{{
+*   val tab: Table = tableEnv.scan("catalogName.dbName.tableName")
+* }}}
+*
+* @param tablePath The path of the table to scan.
+* @throws TableException if no table is found using the given table 
path.
+* @return The resulting [[Table]].
+*/
+  @throws[TableException]
+  def scan(tablePath: String): Table = {
--- End diff --

hi, @twalthr,  there already existed scan method which is `def 
scan(tableName: String)`, I added a scan method which is `def scan(catalogName: 
String, dbName: String, tableName: String)` in first commit. Fabian suggest 
that we could extend the previous scan(String) to accept varargs parameters. 
And We would need to push the implementation to the Scala / Java versions of 
BatchTableEnvironment and StreamTableEnvironment because varargs are handled 
differently by Scala and Java. In this way, we could keep the API more concise 
because we only need a single scan() method. I think it's a good idea, so I 
updated the pr in the second commit. Maybe I'm confused something there.  Any 
advice? @fhueske, @twalthr .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3569: [flink-6036]Let catalog support partition

2017-03-19 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/3569

[flink-6036]Let catalog support partition

This pr aims to let catalog support partition.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink externalCatalog-with-partition

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3569.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3569


commit 3c81e0601ed81b76a1adabb2f38ece46aa07e78e
Author: jingzhang <beyond1...@126.com>
Date:   2017-03-20T04:09:59Z

Let catalog support partition




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3559: [flink-6037] [Table API & SQL]hotfix: metadata provider d...

2017-03-19 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/3559
  
@KurtYoung , thanks for the review. I've updated the pr based on your 
comments. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3559: [flink-6037] [Table API & SQL]hotfix: metadata pro...

2017-03-18 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3559#discussion_r106797445
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
 ---
@@ -100,6 +101,7 @@ class FlinkPlannerImpl(
   assert(validatedSqlNode != null)
   val rexBuilder: RexBuilder = createRexBuilder
   val cluster: RelOptCluster = RelOptCluster.create(planner, 
rexBuilder)
+  cluster.setMetadataProvider(FlinkDefaultRelMetadataProvider.INSTANCE)
--- End diff --

Because this logic is called in 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery, which like
`RelMetadataQuery.THREAD_PROVIDERS.set(
JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-17 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r106648264
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.plan.stats.TableStats
+
+/**
+  * Table definition of the external catalog.
+  *
+  * @param identifier   identifier of external catalog table, 
including dbName and tableName
+  * @param tableTypetype of external catalog table, e.g csv, 
hbase, kafka
+  * @param schema   schema of table data, including column 
names and column types
+  * @param properties   properties of external catalog table
+  * @param statsstatistics of external catalog table
+  * @param comment  comment of external catalog table
+  * @param createTime   create time of external catalog table
+  * @param lastAccessTime   last access time of of external catalog 
table
+  */
+case class ExternalCatalogTable(
+identifier: TableIdentifier,
+tableType: String,
+schema: DataSchema,
--- End diff --

right, thanks for reminder me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3559: [flink-6037] [Table API & SQL]hotfix: metadata pro...

2017-03-17 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/3559

[flink-6037] [Table API & SQL]hotfix: metadata provider didn't work in SQL

This pr aims to fix a bug referenced by 
https://issues.apache.org/jira/browse/FLINK-6037.
After using the right MetadataProvider, 
org.apache.flink.table.ExpressionReductionTest.testReduceCalcExpressionForBatchSQL
 test cannot pass because the optimized plan is changed (The problem is 
referenced by https://issues.apache.org/jira/browse/FLINK-6067 which would be 
fixed in another pr). I simply changed test sql to make it pass in this pr.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3559


commit bd83041507b3f4fdea737b538fb39af4a249e6d2
Author: jingzhang <beyond1...@126.com>
Date:   2017-03-17T09:47:35Z

fix the bug: metadata provider didn't work in SQL




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/3406
  
@twalthr , thanks for the review. I have updated the pr based on your 
suggestion. I would add documents later in the pr #3409. About the name of 
ExternalCatalog, I notice that there are three kinds of catalog at flink now, 
the first one is calcite catalog, the second one is function catalog, the third 
one is the external catalog. I add 'external' prefix to distinguish them. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r106404460
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.annotation.TableType
+import org.apache.flink.table.catalog.{ExternalCatalogTable, 
TableSourceConverter}
+
+import scala.collection.JavaConverters._
+import java.util.{Set => JSet}
+
+import com.google.common.collect.ImmutableSet
+
+/**
+  * The class defines a converter used to convert [[CsvTableSource]] to
+  * or from [[ExternalCatalogTable]].
+  */
+@TableType(value = "csv")
+class CsvTableSourceConverter extends TableSourceConverter[CsvTableSource] 
{
+
+  private val required: JSet[String] = ImmutableSet.of("path", 
"fieldDelim", "rowDelim")
+
+  override def requiredProperties: JSet[String] = required
+
+  override def fromExternalCatalogTable(
+  externalCatalogTable: ExternalCatalogTable): CsvTableSource = {
+val params = externalCatalogTable.properties.asScala
+val csvTableSourceBuilder = new CsvTableSource.Builder
+
+params.get("path").foreach(csvTableSourceBuilder.path)
+params.get("fieldDelim").foreach(csvTableSourceBuilder.fieldDelimiter)
+params.get("rowDelim").foreach(csvTableSourceBuilder.lineDelimiter)
+params.get("quoteCharacter").foreach(quoteStr =>
+  if (quoteStr.length != 1) {
+throw new IllegalArgumentException("the value of param 
quoteCharacter is invalid")
--- End diff --

Ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r106403723
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.lang.reflect.Modifier
+import java.net.URL
+
+import org.apache.commons.configuration.{ConfigurationException, 
ConversionException, PropertiesConfiguration}
+import org.apache.flink.table.annotation.TableType
+import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, 
NoMatchedTableSourceConverterException}
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.util.InstantiationUtil
+import org.reflections.Reflections
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
+  */
+object ExternalTableSourceUtil {
+
+  // config file to specify scan package to search TableSourceConverter
+  private val tableSourceConverterConfigFileName = 
"tableSourceConverter.properties"
+
+  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+  // registered table type with the TableSourceConverter.
+  // Key is table type name, Value is set of converter class.
+  private val tableTypeToTableSourceConvertersClazz = {
+val registeredConverters =
+  new mutable.HashMap[String, mutable.Set[Class[_ <: 
TableSourceConverter[_
+  with mutable.MultiMap[String, Class[_ <: 
TableSourceConverter[_]]]
+// scan all config files to find TableSourceConverters which are 
annotationed with TableType.
+val resourceUrls = 
getClass.getClassLoader.getResources(tableSourceConverterConfigFileName)
+while (resourceUrls.hasMoreElements) {
+  val url = resourceUrls.nextElement()
+  val scanPackages = parseScanPackagesFromConfigFile(url)
+  scanPackages.foreach(scanPackage => {
+val clazzWithAnnotations = new Reflections(scanPackage)
+.getTypesAnnotatedWith(classOf[TableType])
+clazzWithAnnotations.asScala.foreach(clazz =>
+  if (classOf[TableSourceConverter[_]].isAssignableFrom(clazz)) {
+if (Modifier.isAbstract(clazz.getModifiers()) ||
--- End diff --

Yes, I would add check on it based on InstantiationUtil.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r106395036
  
--- Diff: 
flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties 
---
@@ -0,0 +1,19 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
--- End diff --

Ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r106395005
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+import java.lang.{Long => JLong}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.plan.stats.TableStats
+
+/**
+  * Table definition of the external catalog.
+  *
+  * @param identifier   identifier of external catalog table, 
including dbName and tableName
+  * @param tableTypetype of external catalog table, e.g csv, 
hbase, kafka
+  * @param schema   schema of table data, including column 
names and column types
+  * @param properties   properties of external catalog table
+  * @param statsstatistics of external catalog table
+  * @param comment  comment of external catalog table
+  * @param createTime   create time of external catalog table
+  * @param lastAccessTime   last access time of of external catalog 
table
+  */
+case class ExternalCatalogTable(
+identifier: TableIdentifier,
+tableType: String,
+schema: DataSchema,
--- End diff --

ColumnSchema sounds like a schema to describe a column, not the schema to 
describe a table. Maybe TableSchema is better, however there is already a class 
named TableSchema. So I use DataSchema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r106394128
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import org.apache.flink.table.api._
+
+/**
+  * This class is responsible for interact with external catalog.
+  * Its main responsibilities including:
+  * 
+  *  create/drop/alter database or tables for DDL operations
+  *  provide tables for calcite catalog, it looks up databases or 
tables in the external catalog
+  * 
+  */
+trait CrudExternalCatalog extends ExternalCatalog {
+
+  /**
+* Adds table into external Catalog
+*
+* @param table  description of table which to create
+* @param ignoreIfExists if table already exists in the catalog, not 
throw exception and leave
+*   the existed table if ignoreIfExists is true;
+*   else throw a TableAlreadyExistException.
+* @throws DatabaseNotExistException  if database does not exist in the 
catalog yet
+* @throws TableAlreadyExistException if table already exists in the 
catalog and
+*ignoreIfExists is false
+*/
+  @throws[DatabaseNotExistException]
+  @throws[TableAlreadyExistException]
+  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): 
Unit
+
+  /**
+* Deletes table from external Catalog
+*
+* @param dbNamedatabase name
+* @param tableName table name
+* @param ignoreIfNotExists if table not exist yet, not throw exception 
if ignoreIfNotExists is
+*  true; else throw TableNotExistException
+* @throws DatabaseNotExistException if database does not exist in the 
catalog yet
+* @throws TableNotExistExceptionif table does not exist in the 
catalog yet
+*/
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: 
Boolean): Unit
--- End diff --

Take the usage into consideration, when we create database or alter 
database, we may need to specify other properties except for databaseNames, So 
the input parameters to createDatabase and alterDatabase is a 
ExternalCatalogDatabase instance which includes databaseName and other 
properties.
However, when we drop a database, name is enough.
Take ddl for example,
CREATE DATABASE czech_slovak_names 
  CHARACTER SET = 'keybcs2'
  COLLATE = 'keybcs2_bin';

Drop (DATABASE) [IF EXISTS] database_name;



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3409: [flink-5570] [Table API & SQL]Support register external c...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/3409
  
@fhueske , thanks for the review. I have updated the pr based on your 
suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r10636
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -353,19 +388,48 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * The table to scan must be registered in the [[TableEnvironment]]'s 
catalog.
 *
 * @param tableName The name of the table to scan.
-* @throws ValidationException if no table is registered under the 
given name.
+* @throws TableException if no table is registered under the given 
name.
 * @return The scanned table.
 */
   @throws[ValidationException]
   def scan(tableName: String): Table = {
 if (isRegistered(tableName)) {
-  new Table(this, CatalogNode(tableName, getRowType(tableName)))
+  new Table(this, CatalogNode(getRowType(tableName), tableName))
 } else {
   throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
 }
   }
 
   /**
+* Scans a table from registered external catalog and returns the 
resulting [[Table]].
+*
+* @param catalogName The name of the catalog to look-up for the table.
+* @param dbName  The database name of the table to scan.
+* @param tableName   The table name to scan.
+* @throws ExternalCatalogNotExistException if no catalog is registered 
under the given name.
+* @throws TableException   if no database/ table is 
found in the given catalog.
+* @return The scanned table.
+*/
+  @throws[ExternalCatalogNotExistException]
+  @throws[TableException]
+  def scan(catalogName: String, dbName: String, tableName: String): Table 
= {
--- End diff --

Good suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106368629
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -246,6 +251,36 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   }
 
   /**
+* Registers a [[ExternalCatalog]] under a unique name in the 
TableEnvironment's catalog.
+* The databases and tables in the registered external catalog can be 
referenced in SQL queries.
+*
+* @param nameThe name under which the externalCatalog will 
be registered.
+* @param externalCatalog The externalCatalog to register.
+*/
+  def registerExternalCatalog(name: String, externalCatalog: 
ExternalCatalog): Unit = {
+if (this.externalCatalogs.contains(name)) {
--- End diff --

yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106368614
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -246,6 +251,36 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   }
 
   /**
+* Registers a [[ExternalCatalog]] under a unique name in the 
TableEnvironment's catalog.
+* The databases and tables in the registered external catalog can be 
referenced in SQL queries.
+*
+* @param nameThe name under which the externalCatalog will 
be registered.
+* @param externalCatalog The externalCatalog to register.
+*/
+  def registerExternalCatalog(name: String, externalCatalog: 
ExternalCatalog): Unit = {
+if (this.externalCatalogs.contains(name)) {
+  throw new ExternalCatalogAlreadyExistException(name)
+}
+this.externalCatalogs.put(name, externalCatalog)
+// create an external catalog calicte schema, register it on the root 
schema
+ExternalCatalogSchema.create(rootSchema, name, externalCatalog)
+  }
+
+  /**
+* Gets a registered [[ExternalCatalog]] by name
+*
+* @param name The name under which the externalCatalog was previous 
registered.
+* @return the externalCatalog found by name
+*/
+  def getRegisteredExternalCatalog(name: String): ExternalCatalog = {
--- End diff --

We may need to look up the registered catalog to call create/update/delete 
table or database method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-03-16 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3409#discussion_r106368344
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -99,6 +101,9 @@ abstract class TableEnvironment(val config: TableConfig) 
{
   // a counter for unique attribute names
   private val attrNameCntr: AtomicInteger = new AtomicInteger(0)
 
+  // registered external catalog names -> catalog
+  private val externalCatalogs = new HashMap[String, ExternalCatalog]
--- End diff --

@fhueske , store catalog aims to get the registered catalog in the future. 
User may want to call create/update/delete table or database method of catalog 
when execute DDL in SQL or similar operation in TableAPI.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

2017-03-13 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/3406
  
@fhueske , thanks for your review. I already updated the pr based on your 
comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

2017-03-11 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/3406
  
@fhueske , thanks for your review. I updated the pr based on your comments.
Your suggestion about moving the annotation from TableSource to 
TableSourceConverter is good, I changed the pr in this way.  
About not use scanning at all but exactly specify the Converter classes. It 
can work, too. However, if somebody adds new tableSourceConverters , such as 
parquetTableSourceConverter or else, users need to change the code or config 
file to register new added Converters, right? However scanning method can finds 
all converters automatically.
Let me know what's your opinion, thanks.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-10 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105384191
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.io.IOException
+import java.lang.reflect.Modifier
+import java.net.URL
+import java.util.Properties
+
+import org.apache.flink.table.annotation.ExternalCatalogCompatible
+import org.apache.flink.table.api.ExternalCatalogTableTypeNotExistException
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.util.InstantiationUtil
+import org.reflections.Reflections
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+/**
+  * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
+  */
+object CatalogTableHelper {
+
+  // config file to specifier the scan package to search tableSources
+  // which is compatible with external catalog.
+  private val tableSourceConfigFileName = "externalCatalogTable.properties"
--- End diff --

fabian,  register all needed Converter classes instead of use scanning can 
work, too. However  if somebody adds new tableSourceConverters , such as 
parquetTableSourceConverter or else, users need to change the code or config 
file to including the new added Converter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-10 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105363822
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+/**
+  * This class is responsible for interact with external catalog.
+  * Its main responsibilities including:
+  * 
+  *  create/drop/alter database or tables for DDL operations
+  *  provide tables for calcite catalog, it looks up databases or 
tables in the external catalog
+  * 
+  */
+trait ExternalCatalog {
+
+  /**
+* Adds table into external Catalog
+*
+* @param table  description of table which to create
+* @param ignoreIfExists whether to ignore operation if table already 
exists
+*/
+  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): 
Unit
+
+  /**
+* Deletes table from external Catalog
+*
+* @param dbNamedatabase name
+* @param tableName table name
+* @param ignoreIfNotExists whether to ignore operation if table not 
exist yet
+*/
+  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: 
Boolean): Unit
+
+  /**
+* Modifies an existing table in the external catalog
+*
+* @param table description of table which to modify
+* @param ignoreIfNotExists whether to ignore operation if table not 
exist yet
+*/
+  def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): 
Unit
+
+  /**
+* Gets table from external Catalog
+*
+* @param dbNamedatabase name
+* @param tableName table name
+* @return table
+*/
+  def getTable(dbName: String, tableName: String): ExternalCatalogTable
--- End diff --

I would add more comments of the method. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-10 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105363598
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+/**
+  * This class is responsible for interact with external catalog.
+  * Its main responsibilities including:
+  * 
+  *  create/drop/alter database or tables for DDL operations
+  *  provide tables for calcite catalog, it looks up databases or 
tables in the external catalog
+  * 
+  */
+trait ExternalCatalog {
+
+  /**
+* Adds table into external Catalog
+*
+* @param table  description of table which to create
+* @param ignoreIfExists whether to ignore operation if table already 
exists
+*/
+  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): 
Unit
--- End diff --

Hi, fabian. Even for the integration with Calcite, getTable(), 
getDataBase(), and listTables() are sufficient, however an complete 
ExternalCatalog should be responsible for CRUD operations on db/table. For some 
readonly ExternalCatalog, we could choose to not supported createX, dropX, 
andalterX methods , for example, throw UnsupportedOperationException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-10 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105362031
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CatalogTableHelper.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.io.IOException
+import java.lang.reflect.Modifier
+import java.net.URL
+import java.util.Properties
+
+import org.apache.flink.table.annotation.ExternalCatalogCompatible
+import org.apache.flink.table.api.ExternalCatalogTableTypeNotExistException
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.util.InstantiationUtil
+import org.reflections.Reflections
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
+
+/**
+  * The utility class is used to convert ExternalCatalogTable to 
TableSourceTable.
+  */
+object CatalogTableHelper {
+
+  // config file to specifier the scan package to search tableSources
+  // which is compatible with external catalog.
+  private val tableSourceConfigFileName = "externalCatalogTable.properties"
--- End diff --

Hi, fabian. I'm a little confused. Which method do you means?
1. we could add a pkgsToScan field to TableConfig, it's response of Users 
to specify the value of pkgsToScan field. such as 
tableConfig.setPkgsToScan("org.apache.flink.table.sources","org.apache.flink.streaming.connectors.kafka")?
Then users should know exactly which module every needed converter belongs 
to. 
2. We could add a converterConfigFileName in TableConfig, it's response of 
Users to specify the value of pkgsToScan field. such as 
tableConfig.setConverterConfigFileName("externalCatalogTable.properties")?
The way is a little strange because the file name is fixed based on the 
convention, may not changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-10 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105362292
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+/**
+  * This class is responsible for interact with external catalog.
+  * Its main responsibilities including:
+  * 
+  *  create/drop/alter database or tables for DDL operations
+  *  provide tables for calcite catalog, it looks up databases or 
tables in the external catalog
+  * 
+  */
+trait ExternalCatalog {
+
+  /**
+* Adds table into external Catalog
+*
+* @param table  description of table which to create
+* @param ignoreIfExists whether to ignore operation if table already 
exists
--- End diff --

Good advice, I would change the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-09 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r105316858
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/ExternalCatalogCompatible.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.annotation;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.table.catalog.TableSourceConverter;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A tableSource with this annotation represents it is compatible with 
external catalog, that is,
+ * an instance of this tableSource can be converted to or converted from 
external catalog table
+ * instance.
+ * The annotation contains the following information:
+ * 
+ *  external catalog table type name for this kind of tableSource 
+ *  external catalog table <-> tableSource converter class 
+ * 
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Public
+public @interface ExternalCatalogCompatible {
--- End diff --

This suggestion is pretty good, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

2017-03-06 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/3406
  
@fhueske, thanks for your review. I changed the pr based on your 
suggestions, except for one point.
About adding the version field to ExternalCatalogCompatible, could we 
define tableType is identifier, it includes version information. For example, 
kafka0.8/ kafka0.9 / kafka1.0 or hive1.0/ hive2.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-06 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r104401203
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -92,7 +92,12 @@ under the License.


 
-
+   
--- End diff --

thanks a lot, I would exclude com.google.code.findbugs.* which license is 
LGPL.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-06 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r104401325
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/ExternalCatalogCompatible.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.annotation;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.table.catalog.TableSourceConverter;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A tableSource with this annotation represents it is compatible with 
external catalog, that is,
+ * an instance of this tableSource can be converted to or converted from 
external catalog table
+ * instance.
+ * The annotation contains the following information:
+ * 
+ *  external catalog table type name for this kind of tableSource 
+ *  external catalog table <-> tableSource converter class 
+ * 
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Public
+public @interface ExternalCatalogCompatible {
--- End diff --

could we define tableType is identifier, it includes version information.  
For example, kafka0.8/ kafka0.9 / kafka1.0 or hive1.0/ hive2.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-06 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r104394065
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.schema._
+import org.apache.flink.table.api.{DatabaseNotExistException, 
TableNotExistException}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+
+/**
+  * This class is responsible for connect external catalog to calcite 
catalog.
+  * In this way, it is possible to look-up and access tables in SQL queries
+  * without registering tables in advance.
+  * The databases in the external catalog registers as calcite sub-Schemas 
of current schema.
+  * The tables in a given database registers as calcite tables
+  * of the [[ExternalCatalogDatabaseSchema]].
+  *
+  * @param catalogIdentifier external catalog name
+  * @param catalog   external catalog
+  */
+class ExternalCatalogSchema(
+catalogIdentifier: String,
+catalog: ExternalCatalog) extends Schema {
+
+  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+  /**
+* Looks up database by the given sub-schema name in the external 
catalog,
+* returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the 
given database name.
+*
+* @param name Sub-schema name
+* @return Sub-schema with a given name, or null
+*/
+  override def getSubSchema(name: String): Schema = {
+try {
+  val db = catalog.getDatabase(name)
+  if (db != null) {
+new ExternalCatalogDatabaseSchema(db.dbName, catalog)
+  } else {
+null
+  }
+} catch {
+  case e: DatabaseNotExistException =>
+LOG.warn(s"database $name does not exist in externalCatalog 
$catalogIdentifier")
+null
+}
+  }
+
+  /**
+* Lists the databases of the external catalog,
+* returns the lists as the names of this schema's sub-schemas.
+*
+* @return names of this schema's child schemas
+*/
+  override def getSubSchemaNames: util.Set[String] = 
catalog.listDatabases().toSet.asJava
+
+  override def getTable(name: String): Table = null
+
+  override def isMutable: Boolean = true
+
+  override def getFunctions(name: String): util.Collection[Function] =
+util.Collections.emptyList[Function]
+
+  override def getExpression(parentSchema: SchemaPlus, name: String): 
Expression =
+Schemas.subSchemaExpression(parentSchema, name, getClass)
+
+  override def getFunctionNames: util.Set[String] = 
util.Collections.emptySet[String]
+
+  override def getTableNames: util.Set[String] = 
util.Collections.emptySet[String]
+
+  override def contentsHaveChangedSince(lastCheck: Long, now: Long): 
Boolean = false
--- End diff --

Oh, yes, my bad. Would change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-06 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r104371759
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -92,7 +92,12 @@ under the License.


 
-
+   
--- End diff --

fabian,about reflections jar, its license is WTFPL, WTFPL only says 
'Everyone is permitted to copy and distribute verbatim or modified
copies of this license document', however it does not say it compatible 
with ASL explicitly.
Besides, I also want to include the class of this jar into the generated 
flink-table-{version}.jar, just like calcite class. I don’t know whether it 
is allowed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-05 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r104343965
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
 ---
@@ -68,6 +62,90 @@ object CommonTestData {
 )
   }
 
+  def getMockedFlinkExternalCatalog: ExternalCatalog = {
+val csvRecord1 = Seq(
+  "1#1#Hi",
+  "2#2#Hello",
+  "3#2#Hello world"
+)
+val tempFilePath1 = writeToTempFile(csvRecord1.mkString("$"), 
"csv-test1", "tmp")
+val externalCatalogTable1 = ExternalCatalogTable(
+  TableIdentifier("db1", "tb1"),
+  "csv",
+  DataSchema(
+Array(
+  BasicTypeInfo.INT_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.STRING_TYPE_INFO),
+Array("a", "b", "c")
+  ),
+  properties = Map(
+"path" -> tempFilePath1,
+"fieldDelim" -> "#",
+"rowDelim" -> "$"
+  )
+)
+
+val csvRecord2 = Seq(
+  "1#1#0#Hallo#1",
+  "2#2#1#Hallo Welt#2",
+  "2#3#2#Hallo Welt wie#1",
+  "3#4#3#Hallo Welt wie gehts?#2",
+  "3#5#4#ABC#2",
+  "3#6#5#BCD#3",
+  "4#7#6#CDE#2",
+  "4#8#7#DEF#1",
+  "4#9#8#EFG#1",
+  "4#10#9#FGH#2",
+  "5#11#10#GHI#1",
+  "5#12#11#HIJ#3",
+  "5#13#12#IJK#3",
+  "5#14#13#JKL#2",
+  "5#15#14#KLM#2"
+)
+val tempFilePath2 = writeToTempFile(csvRecord2.mkString("$"), 
"csv-test2", "tmp")
+val externalCatalogTable2 = ExternalCatalogTable(
+  TableIdentifier("db2", "tb2"),
+  "csv",
+  DataSchema(
+Array(
+  BasicTypeInfo.INT_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.INT_TYPE_INFO,
+  BasicTypeInfo.STRING_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO),
+Array("d", "e", "f", "g", "h")
+  ),
+  properties = Map(
+"path" -> tempFilePath2,
+"fieldDelim" -> "#",
+"rowDelim" -> "$"
+  )
+)
+val catalog = mock(classOf[ExternalCatalog])
--- End diff --

Good idea


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-05 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r104343679
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
 ---
@@ -68,6 +62,90 @@ object CommonTestData {
 )
   }
 
+  def getMockedFlinkExternalCatalog: ExternalCatalog = {
+val csvRecord1 = Seq(
--- End diff --

val csvRecord1 = Seq() is just csv data, maybe it's better to name it as 
csvDataRecords?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-05 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r104340593
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.schema._
+import org.apache.flink.table.api.{DatabaseNotExistException, 
TableNotExistException}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+
+/**
+  * This class is responsible for connect external catalog to calcite 
catalog.
+  * In this way, it is possible to look-up and access tables in SQL queries
+  * without registering tables in advance.
+  * The databases in the external catalog registers as calcite sub-Schemas 
of current schema.
+  * The tables in a given database registers as calcite tables
+  * of the [[ExternalCatalogDatabaseSchema]].
+  *
+  * @param catalogIdentifier external catalog name
+  * @param catalog   external catalog
+  */
+class ExternalCatalogSchema(
+catalogIdentifier: String,
+catalog: ExternalCatalog) extends Schema {
+
+  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+  /**
+* Looks up database by the given sub-schema name in the external 
catalog,
+* returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the 
given database name.
+*
+* @param name Sub-schema name
+* @return Sub-schema with a given name, or null
+*/
+  override def getSubSchema(name: String): Schema = {
+try {
+  val db = catalog.getDatabase(name)
+  if (db != null) {
+new ExternalCatalogDatabaseSchema(db.dbName, catalog)
+  } else {
+null
+  }
+} catch {
+  case e: DatabaseNotExistException =>
+LOG.warn(s"database $name does not exist in externalCatalog 
$catalogIdentifier")
+null
+}
+  }
+
+  /**
+* Lists the databases of the external catalog,
+* returns the lists as the names of this schema's sub-schemas.
+*
+* @return names of this schema's child schemas
+*/
+  override def getSubSchemaNames: util.Set[String] = 
catalog.listDatabases().toSet.asJava
+
+  override def getTable(name: String): Table = null
+
+  override def isMutable: Boolean = true
+
+  override def getFunctions(name: String): util.Collection[Function] =
+util.Collections.emptyList[Function]
+
+  override def getExpression(parentSchema: SchemaPlus, name: String): 
Expression =
+Schemas.subSchemaExpression(parentSchema, name, getClass)
+
+  override def getFunctionNames: util.Set[String] = 
util.Collections.emptySet[String]
+
+  override def getTableNames: util.Set[String] = 
util.Collections.emptySet[String]
+
+  override def contentsHaveChangedSince(lastCheck: Long, now: Long): 
Boolean = false
--- End diff --

set contentsHaveChangedSince to false because We want to fetch the latest 
informations from the underlying ExternalCatalog instead of using calcite 
caches. Because tables list and databases list of ExternalCatalog may changed 
anytime. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-03-05 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3406#discussion_r104333889
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -92,7 +92,12 @@ under the License.


 
-
+   
--- End diff --

Yes, this problem bothers me, too. the license of reflections jar is  
WTFPL, does that means we would do anything, how to check whether it is 
compatible with AL2? I noticed that the reflections jar is already referenced 
in  flink-parent.pom, but only for tests, and would not be included in any 
flink jars. My original thought is not only use this jar, but also include it 
(like calcit) in the flink-table-{version}.jar. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3409: [flink-5570] [Table API & SQL]Support register ext...

2017-02-24 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/3409

[flink-5570] [Table API & SQL]Support register external catalog to table 
environment

This pr aims to support register external catalog to TableEnvironment.
The pr contains two commits, the first one is about 
https://issues.apache.org/jira/browse/FLINK-5568, it's content is as same as 
(https://github.com/apache/flink/pull/3406).
The second commit is to support externalCatalog registration. So please 
focus on the second commit when you review this pr.

The main changes in the second commit including:
1. add registerExternalCatalog method in TableEnvironment to register 
external catalog
2. add scan method in TableEnvironment to scan the table of the external 
catalog
3. add test cases for ExternalCatalog, including registration and scan

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5570

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3409.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3409


commit d0e1ab20078adc4f788e9c2d2c167f0251ae3476
Author: jingzhang <beyond1...@126.com>
Date:   2017-02-22T11:28:08Z

Introduce interface for external catalog, and provide an in-memory 
implementation for test or develop. Integrate with calcite catalog.

commit 05e2b13847fab01e330d4bf2232886a793f7dd0c
Author: jingzhang <beyond1...@126.com>
Date:   2017-02-24T06:10:50Z

Support register external catalog to table environment




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...

2017-02-23 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/3406

[flink-5568] [Table API & SQL]Introduce interface for catalog, and provide 
an in-memory implementation. Integrate external catalog with calcite catalog

This pr aims to introduce interface for catalog, and provide an in-memory 
implementation for test and develop, finally integrate external catalog with 
calcite catalog.
The main change including:
1. Introduce ExternalCatalog abstraction, including introduce 
ExternalCatalogDatabase as database   in catalog and ExternalCatalogTable as 
table in catalog.
2. Provide an in-memory implementation for test and develop.
3. Introduce ExternalCatalogSchema which is an implementation of Calcite 
Schema interface. It registers database in ExternalCatalog as calcite Schemas, 
and tables in a database as Calcite table.
4. Add ExternalCatalogCompatible annotation. The TableSource with this 
annotation represents it could be converted to or from externalCatalogTable. 
ExternalCatalogTableConverter is the converter between externalCatalogTable and 
tableSource.
5. Introduce CatalogTableHelper utility. It has two responsibilities: * 
automatically find the TableSources which are with ExternalCatalogCompatible 
annotation. * convert an ExternalCatalogTable instance to a TableSourceTable 
instance.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink dev

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3406.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3406


commit d0e1ab20078adc4f788e9c2d2c167f0251ae3476
Author: jingzhang <beyond1...@126.com>
Date:   2017-02-22T11:28:08Z

Introduce interface for external catalog, and provide an in-memory 
implementation for test or develop. Integrate with calcite catalog.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3197: [FLINK-5567] [Table API & SQL]Introduce and migrate curre...

2017-02-14 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/3197
  
@fhueske , thanks for your review. I already modify code based on your 
advice and rebase the pr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3196: [FLINK-5566] [Table API & SQL]Introduce structure to hold...

2017-02-13 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/3196
  
@fhueske , thanks for your review. I modify code based on your advice, 
including compatibility with Java and column stats field type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3196: [FLINK-5566] [Table API & SQL]Introduce structure ...

2017-02-13 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3196#discussion_r100958745
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.stats
+
+/**
+  * column statistics
+  *
+  * @param ndv   number of distinct values
+  * @param nullCount number of nulls
+  * @param avgLenaverage length of column values
+  * @param maxLenmax length of column values
+  * @param max   max value of column values
+  * @param min   min value of column values
+  */
+case class ColumnStats(
+ndv: Long,
+nullCount: Long,
+avgLen: Long,
+maxLen: Long,
+max: Option[Any],
+min: Option[Any]) {
--- End diff --

It makes sense to add a field to denote whether stats are precise or 
approximate and a field to hold timestamp when the stats were generated. But 
I'm not sure how these two fields effects the optimized plan. Because we prefer 
to use the provided stats, even it is estimated value or it is a little stale. 
So I didn't add these two fields currently, maybe will add them later. What do 
you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3196: [FLINK-5566] [Table API & SQL]Introduce structure ...

2017-02-13 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3196#discussion_r100951579
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.stats
+
+/**
+  * column statistics
+  *
+  * @param ndv   number of distinct values
+  * @param nullCount number of nulls
+  * @param avgLenaverage length of column values
+  * @param maxLenmax length of column values
+  * @param max   max value of column values
+  * @param min   min value of column values
+  */
+case class ColumnStats(
+ndv: Long,
+nullCount: Long,
+avgLen: Long,
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3196: [FLINK-5566] [Table API & SQL]Introduce structure ...

2017-02-13 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3196#discussion_r100950729
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.stats
+
+/**
+  * column statistics
+  *
+  * @param ndv   number of distinct values
+  * @param nullCount number of nulls
+  * @param avgLenaverage length of column values
+  * @param maxLenmax length of column values
+  * @param max   max value of column values
+  * @param min   min value of column values
+  */
+case class ColumnStats(
+ndv: Long,
--- End diff --

@fhueske , there is no need to make all stats optional. If there is no 
statistics for ndv/nullcount/avgLen/maxLen, we could give them an invalid 
value, e.g, -1. But it does not work for max/min, because max/min value could 
be possible negative. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2791: [FLINK-4449] [cluster management] add Heartbeat manager i...

2017-02-09 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2791
  
@tillrohrmann , thanks your reply. However my partner @wangzhijiang999 had 
a different implementation on this jira before, he will open a new pr soon. So 
I would close this pr. Sorry for wasting your time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2791: [FLINK-4449] [cluster management] add Heartbeat ma...

2017-02-09 Thread beyond1920
Github user beyond1920 closed the pull request at:

https://github.com/apache/flink/pull/2791


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3197: [FLINK-5567] [Table API & SQL]Introduce and migrat...

2017-01-23 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/3197

[FLINK-5567] [Table API & SQL]Introduce and migrate current table 
statistics to FlinkStatistics

This pr includes two commits, the first commit is Related to 
[https://github.com/apache/flink/pull/3196](url), the second commit is to 
introduce and migrate current table statistics to FlinkStatistics. So please 
focus on second commit.
The main changes including:
1. Introduce FlinkStatistic class, which is an implementation of Calcite 
Statistic.
2. Integrate FlinkStatistic with FlinkTable. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5567

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3197.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3197


commit cadc16eefb0e0a9002e536a48b4b9f6824b6ab23
Author: 槿瑜 <jinyu...@alibaba-inc.com>
Date:   2017-01-24T06:34:01Z

Introduce structure to hold table and column level statistics

commit 56c51b0f8d7983b8593946f64ece2b4881f0d723
Author: 槿瑜 <jinyu...@alibaba-inc.com>
Date:   2017-01-24T06:57:08Z

ntroduce and migrate current table statistics to FlinkStatistics




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3196: [FLINK-5566] [Table API & SQL]Introduce structure ...

2017-01-23 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/3196

[FLINK-5566] [Table API & SQL]Introduce structure to hold table and column 
level statistics

This pr aims to introduce structure to hold table and column level 
statistics.
TableStats: Responsible for hold table level statistics
ColumnStats: Responsible for hold column level statistics.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5566

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3196.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3196


commit cadc16eefb0e0a9002e536a48b4b9f6824b6ab23
Author: 槿瑜 <jinyu...@alibaba-inc.com>
Date:   2017-01-24T06:34:01Z

Introduce structure to hold table and column level statistics




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3058: [FLINK-5394] [Table API & SQL]the estimateRowCount...

2017-01-14 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3058#discussion_r96112195
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,6 +72,21 @@ class DataSetSort(
 )
   }
 
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+val inputRowCnt = metadata.getRowCount(this.getInput)
+if (inputRowCnt == null) {
+  inputRowCnt
+} else {
+  val rowCount = Math.max(inputRowCnt - limitStart, 0D)
--- End diff --

Yes, that make sense.  I already fix it. Thanks @fhueske .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3058: [FLINK-5394] [Table API & SQL]the estimateRowCount...

2017-01-13 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3058#discussion_r95967156
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,6 +72,21 @@ class DataSetSort(
 )
   }
 
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+val inputRowCnt = metadata.getRowCount(this.getInput)
+if (inputRowCnt == null) {
+  inputRowCnt
+} else {
+  val rowCount = Math.max(inputRowCnt - limitStart, 0D)
--- End diff --

Hmm, the estimation 0 only  happens if inputRowCount <= start and (fetch  
is null or fetchValue<=0). I think this estimation is reasonable in this case.  
Besides, why choose return 1 instead of 0.1 or 0.01 or other values?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3058: [FLINK-5394] [Table API & SQL]the estimateRowCount...

2017-01-04 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/3058

[FLINK-5394] [Table API & SQL]the estimateRowCount method of DataSetCalc 
didn't work

This pr aims to fix a bug which is referenced by 
https://issues.apache.org/jira/browse/FLINK-5394.
The main changes including:
1. add FlinkRelMdRowCount and  FlinkDefaultRelMetadataProvider to override 
getRowCount  of some Flink RelNodes
2. add getRowCount method in DatasetSort to provide more accurate estimate

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5394

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3058.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3058


commit 8099920fb8759ed1068e7b8153816a7b63089e45
Author: beyond1920 <beyond1...@126.com>
Date:   2016-12-29T07:52:17Z

the estimateRowCount method of DataSetCalc didn't work now, fix it




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2923: [FLINK-5220] [Table API & SQL] Flink SQL projection pushd...

2016-12-10 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2923
  
@fhueske , thanks for your review, I already modify code based on your 
advices.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2923: [FLINK-5220] [Table API & SQL] Flink SQL projectio...

2016-12-10 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91847990
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch
+
+import org.apache.flink.api.common.io.GenericInputFormat
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment 
=> JavaExecEnv}
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ProjectableTableSourceITCase(mode: TestExecutionMode,
+  configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  private val tableName = "MyTable"
+  private var tableEnv: BatchTableEnvironment = null
+
+  @Before
+  def initTableEnv(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+tableEnv = TableEnvironment.getTableEnvironment(env, config)
+tableEnv.registerTableSource(tableName, new TestProjectableTableSource)
+  }
+
+  @Test
+  def testTableAPI(): Unit = {
+val results = tableEnv
+  .scan(tableName)
+  .where("amount < 4")
+  .select("id, name")
+  .collect()
+
+val expected = Seq(
+  "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", 
"16,Record_16",
+  "17,Record_17", "18,Record_18", "19,Record_19", 
"32,Record_32").mkString("\n")
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+
+  @Test
+  def testSQL(): Unit = {
+val results = tableEnv
+  .sql(s"select id, name from $tableName where amount < 4 ")
+  .collect()
+
+val expected = Seq(
+  "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", 
"16,Record_16",
+  "17,Record_17", "18,Record_18", "19,Record_19", 
"32,Record_32").mkString("\n")
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+}
+
+class TestProjectableTableSource(
+  fieldTypes: Array[TypeInformation[_]],
+  fieldNames: Array[String])
+  extends BatchTableSource[Row] with ProjectableTableSource[Row] {
+
+  def this() = this(
+fieldTypes = Array(
+  BasicTypeInfo.STRING_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.INT_TYPE_INFO,
+  BasicTypeInfo.DOUBLE_TYPE_INFO),
+fieldNames = Array[String]("name", "id", "amount", "price")
+  )
+
+  /** Returns the data of the table as a 
[[org.apache.flink.api.java.DataSet]]. */
+  override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
+execEnv.createInput(new ProjectableInputFormat(33, fieldNames), 
getReturnType).setParallelism(1)
--- End diff --

yes, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2923: [FLINK-5220] [Table API & SQL] Flink SQL projectio...

2016-12-10 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91847985
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.util
+
+import java.math.BigDecimal
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, 
VARCHAR}
+import org.apache.calcite.rex.{RexBuilder, RexInputRef, RexProgram, 
RexProgramBuilder}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+
+import scala.collection.JavaConverters._
+import 
org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._
+import org.junit.{Assert, Before, Test}
+
+/**
+  * This class is responsible for testing RexProgramProjectExtractor
+  */
+class RexProgramProjectExtractorTest {
+  private var typeFactory: JavaTypeFactory = null
+  private var rexBuilder: RexBuilder = null
+  private var allFieldTypes: Seq[RelDataType] = null
+  private val allFieldNames = List("name", "id", "amount", "price")
+
+  @Before
+  def setUp: Unit = {
+typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
+rexBuilder = new RexBuilder(typeFactory)
+allFieldTypes = List(VARCHAR, BIGINT, INTEGER, 
DOUBLE).map(typeFactory.createSqlType(_))
+  }
+
+  @Test
+  def testExtractRefInputFields: Unit = {
+val usedFields = extractRefInputFields(buildRexProgram)
+Assert.assertArrayEquals(usedFields, Array(2, 1, 3))
+  }
+
+  @Test
+  def testRewriteRexProgram: Unit = {
+val originRexProgram = buildRexProgram
+
Assert.assertTrue(extractExprStrList(originRexProgram).sameElements(Array(
+  "$0",
+  "$1",
+  "$2",
+  "$3",
+  "*($t2, $t3)",
+  "100",
+  "<($t4, $t5)",
+  "6",
+  ">($t2, $t7)",
+  "AND($t6, $t8)")))
+// use amount, id, price fields to create a new RexProgram
+val usedFields = Array(2, 1, 3)
+val types = usedFields.map(allFieldTypes(_)).toList.asJava
+val names = usedFields.map(allFieldNames(_)).toList.asJava
+val inputRowType = typeFactory.createStructType(types, names)
+val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, 
usedFields, rexBuilder)
+Assert.assertTrue(extractExprStrList(newRexProgram).sameElements(Array(
+  "$0",
+  "$1",
+  "$2",
+  "*($t0, $t2)",
+  "100",
+  "<($t3, $t4)",
+  "6",
+  ">($t0, $t6)",
+  "AND($t5, $t7)")))
+  }
+
+  private def buildRexProgram: RexProgram = {
+val types = allFieldTypes.asJava
+val names = allFieldNames.asJava
+val inputRowType = typeFactory.createStructType(types, names)
+val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+val t0 = rexBuilder.makeInputRef(types.get(2), 2)
+val t1 = rexBuilder.makeInputRef(types.get(1), 1)
+val t2 = rexBuilder.makeInputRef(types.get(3), 3)
+val t3 = 
builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
+val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
+// project: amount, id, amount * price
+builder.addProject(t0, "amount")
+builder.addProject(t1, "id")
+

[GitHub] flink pull request #2926: [FLINK-5226] [table] Use correct DataSetCostFactor...

2016-12-07 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2926#discussion_r91455098
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -73,7 +75,11 @@ class DataSetCalc(
 
 val child = this.getInput
 val rowCnt = metadata.getRowCount(child)
-val exprCnt = calcProgram.getExprCount
+
+// compute number of non-field access expressions (computations, 
conditions, etc.)
+//   we only want to account for computations, not for simple 
projections.
+val exprCnt = 
calcProgram.getExprList.asScala.toList.count(!_.isInstanceOf[RexInputRef])
--- End diff --

maybe we could also exclude RexLiteral as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2923: [FLINK-5220] [Table API & SQL] Flink SQL projectio...

2016-12-07 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91429660
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
 ---
@@ -45,7 +45,8 @@ abstract class BatchScan(
   override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
 
 val rowCnt = metadata.getRowCount(this)
-planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+val columnCnt = getRowType.getFieldCount
--- End diff --

Yes, that's right. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2923: [FLINK-5220] [Table API & SQL] Flink SQL projection pushd...

2016-12-07 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2923
  
@fhueske , thanks for your review. I have already modified the pr based on 
your advice, including following main changes:
1. Adapt the cost model for the BatchTableSourceScan
2. Modify DataSetCalcConverter -> RexProgramProjectExtractor, and add a 
test case
Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2923: [FLINK-5220] [Table API & SQL] Flink SQL projectio...

2016-12-02 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2923

[FLINK-5220] [Table API & SQL] Flink SQL projection pushdown

This pr aims to do projection pushdown optimization.
There are two commits here, first one is linked to 
[https://issues.apache.org/jira/browse/FLINK-5185](url), it is the pre work; 
second commit is merely about projection pushdown work. So it's maybe better to 
start with the second commit.
The main changes including:
1. add  PushProjectIntoBatchTableSourceScanRule to match 
DataSetCalc->BatchTableSourceScan
2. add ProjectableTableSource to represent a TableSource which supports 
Projection pushdown
3. change BatchScan cost compute logic
4. add a test case


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5220

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2923.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2923


commit 6f4ecf2efaf424505e05c2d9142c90da24e12ed1
Author: beyond1920 <beyond1...@126.com>
Date:   2016-11-29T04:26:02Z

Decouple BatchTableSourceScan with TableSourceTable
modify constructor of BatchScan, BatchTableSourceScan, DataSetScan

Test Plan: junit

Reviewers: kete.yangkt

Differential Revision: http://phabricator.taobao.net/D6601

modify code style and extract common method

let rule decide which tableSource to create a BatchTableScan

Decouple BatchTableSourceScan with TableSourceTable

make long length shorter to pass the flink code style check

commit 181f7f7d4362799549f9ad3e7da2e69838c0f834
Author: beyond1920 <beyond1...@126.com>
Date:   2016-12-02T03:33:12Z

push project down into BatchTableSourceScan




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2921: [FLINK-5185] [Table API & SQL] Decouple BatchTable...

2016-12-01 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2921#discussion_r90577312
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
 ---
@@ -32,18 +31,20 @@ class BatchTableSourceScan(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
 table: RelOptTable,
-rowType: RelDataType)
-  extends BatchScan(cluster, traitSet, table, rowType) {
+val tableSource: BatchTableSource[_])
--- End diff --

@wuchong , Thanks for your review. For later use, the projection pushdown 
rule needs to get the actual tablesource of the BatchTableSourceScan, then do 
something on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2921: [FLINK-5185] [Table API & SQL] Decouple BatchTable...

2016-12-01 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2921

[FLINK-5185] [Table API & SQL] Decouple BatchTableSourceScan with 
TableSourceTable

The pr aims to decouple BatchTableSourceScan with TableSourceTable for 
further optimization, e.g, projection pushdown/filter pushdown/query pushdown.  
The principle is  let BatchTableSourceScan directly holding TableSource instead 
of holding TableSourceTable.
The main changes including:
1. change constructors of BatchScan/BatchTableSourceScan/DataSetScan
2. Extract the logic logical of build row types based on fields types and 
fields name into FlinkTypeFactory class.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5185

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2921.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2921


commit d0718ff512a931cf0bb024df516a36215f486df5
Author: beyond1920 <beyond1...@126.com>
Date:   2016-11-29T04:26:02Z

modify constructor of BatchScan, BatchTableSourceScan, DataSetScan

Test Plan: junit

Reviewers: kete.yangkt

Differential Revision: http://phabricator.taobao.net/D6601

commit 068b2ec7df9207bf246305da3dd744fcd339844a
Author: beyond1920 <beyond1...@126.com>
Date:   2016-11-30T01:21:42Z

modify code style and extract common method

commit fe0ac06e7a76663100a76ec91c8873de8def548b
Author: beyond1920 <beyond1...@126.com>
Date:   2016-11-30T03:21:36Z

let rule decide which tableSource to create a BatchTableScan

commit 1e4bebaaf2d0e23dc561d42feceecb4537336b57
Author: beyond1920 <beyond1...@126.com>
Date:   2016-12-01T03:45:34Z

Decouple BatchTableSourceScan with TableSourceTable

commit 7d558b723a3241a63b1ecdcfb92f15b7676f71bc
Author: beyond1920 <beyond1...@126.com>
Date:   2016-12-01T07:36:42Z

make long length shorter to pass the flink code style check




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2791: [FLINK-4449] [cluster management] add Heartbeat ma...

2016-11-11 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2791

[FLINK-4449] [cluster management] add Heartbeat manager in ResourceManager 
with TaskExecutor

This pr aims to add Heartbeat manager in ResourceManager with TaskExecutor. 
This pr focused on  ResourceManager side, while #2770 focused on the 
TaskExecutor side. The main difference in this pr including:
1. Add heartbeatService which is responsible for heartbeat communication 
with TaskExecutor
2. Register heartbeatService to ResourceManager as a service when 
ResourceManagerRunner constructor a new  ResourceManager instance
3. Add heartbeat RPC method in ResourceManagerGateway and 
TaskExecutorGateway, and implement the logic in the ResourceManager.
4. Change the TaskExecutorRegistrationSuccess because RM should tell TM its 
heart identifier.
5. Change some testcase in order to adapt to the modification.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4449

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2791.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2791


commit 1326cc08988929263b31db8b4e8cf3d9107311d6
Author: beyond1920 <beyond1...@126.com>
Date:   2016-10-31T09:50:15Z

ResourceManager need a heartbeat manager to monitor the connections with 
all registered TaskExecutor, add heartbeatService in ResourceManager to monitor 
liveness of taskExecutors

commit 3344e817ffa868e02778ea75ccd5c6fb40aa6670
Author: beyond1920 <beyond1...@126.com>
Date:   2016-11-12T02:08:55Z

add heartbeat logic from resourceManger to taskManager

commit f3c432f569a642536bf03fadcbc81823d0b7ea94
Author: beyond1920 <beyond1...@126.com>
Date:   2016-11-12T02:38:05Z

add heartbeat mechinism between rm and tm




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2524: [FLINK-4653] [Client] Refactor JobClientActor to a...

2016-11-11 Thread beyond1920
Github user beyond1920 closed the pull request at:

https://github.com/apache/flink/pull/2524


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] when call connec...

2016-10-16 Thread beyond1920
Github user beyond1920 closed the pull request at:

https://github.com/apache/flink/pull/2455


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager

2016-09-29 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2571
  
@mxm, What would happen under following cases: taskExecutor releases a 
registered slot,  then taskExecutor reports its latest slotReport to 
ResourceManager, ResourceManager should remove the old slot allocation from its 
own view and mark the slot free. So I think we should keep the following code 
in the old SlotManager `updateSlotStatus`:
_else {
// slot is reported empty

// check whether we also thought this slot is 
empty
if (allocationMap.isAllocated(slotId)) {
LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:null",
slotId, 
allocationMap.getAllocationID(slotId));

// we thought the slot is in use, 
correct it
allocationMap.removeAllocation(slotId);

// we have a free slot!
handleFreeSlot(slot);
}
}_


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...

2016-09-28 Thread beyond1920
Github user beyond1920 closed the pull request at:

https://github.com/apache/flink/pull/2540


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2543: [FLINK-4670] [cluster management] Add watch mechan...

2016-09-27 Thread beyond1920
Github user beyond1920 closed the pull request at:

https://github.com/apache/flink/pull/2543


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...

2016-09-27 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2540#discussion_r80836325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -324,6 +337,158 @@ public void handleError(final Exception exception) {
shutDown();
}
 
+   /**
+* Registers an infoMessage listener
+*
+* @param infoMessageListenerAddress address of infoMessage listener to 
register to this resource manager
+*/
+   @RpcMethod
+   public void registerInfoMessageListener(final String 
infoMessageListenerAddress) {
+   
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
+   log.warn("Receive a duplicate registration from info 
message listener on ({})", infoMessageListenerAddress);
+   } else {
+   Future 
infoMessageListenerRpcGatewayFuture = 
getRpcService().connect(infoMessageListenerAddress, 
InfoMessageListenerRpcGateway.class);
+
+   infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction() {
+   @Override
+   public void 
accept(InfoMessageListenerRpcGateway gateway) {
+   log.info("Receive a registration from 
info message listener on ({})", infoMessageListenerAddress);
+   
infoMessageListeners.put(infoMessageListenerAddress, gateway);
+   }
+   }, getMainThreadExecutor());
+
+   
infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
+   @Override
+   public Void apply(Throwable failure) {
+   log.warn("Receive a registration from 
unreachable info message listener on ({})", infoMessageListenerAddress);
+   return null;
+   }
+   }, getMainThreadExecutor());
+   }
+   }
+
+   /**
+* Unregisters an infoMessage listener
+*
+* @param infoMessageListenerAddress address of infoMessage listener to 
unregister from this resource manager
+*
+*/
+   @RpcMethod
+   public void unRegisterInfoMessageListener(final String 
infoMessageListenerAddress) {
+   infoMessageListeners.remove(infoMessageListenerAddress);
+   }
+
+   /**
+* Shutdowns cluster
+*
+* @param finalStatus
+* @param optionalDiagnostics
+*/
+   @RpcMethod
+   public void shutDownCluster(final ApplicationStatus finalStatus, final 
String optionalDiagnostics) {
+   log.info("shut down cluster because application is in {}, 
diagnostics {}", finalStatus, optionalDiagnostics);
+   shutDownApplication(finalStatus, optionalDiagnostics);
+   }
+
+   /**
+* This method should be called by the framework once it detects that a 
currently registered task executor has failed.
+*
+* @param resourceID Id of the worker that has failed.
+* @param message An informational message that explains why the worker 
failed.
+*/
+   public void notifyWorkerFailed(final ResourceID resourceID, String 
message) {
+   runAsync(new Runnable() {
+   @Override
+   public void run() {
+   WorkerType worker = 
taskExecutorGateways.remove(resourceID);
+   if (worker != null) {
+   // TODO :: suggest failed task executor 
to stop itself
+   
slotManager.notifyTaskManagerFailure(resourceID);
+   }
+   }
+   });
+   }
+
+   /**
+* Gets the number of currently started TaskManagers.
+*
+* @return The number of currently started TaskManagers.
+*/
+   public int getNumberOfStartedTaskManagers() {
+   return taskExecutorGateways.size();
+   }
+
+   /**
+* Notifies the resource manager of a fatal error.
+*
+* IMPORTANT: This should not cleanly shut down this master, 
but exit it in
+* such a way that a high-availability setting would restart this or 
fail over
+* to another master.
+*/
+   public void onFatalError(final String message, final Throwable error) {
+   runAsync(new Runnable() {
+   @Override
+   public void run() {
+  

[GitHub] flink issue #2540: [FLINK-4606] [cluster management] Integrate the new Resou...

2016-09-27 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2540
  
@mxm , thanks for your review, I modified the pr based on your advices:
1. fIx checkstyle error, `AkkaRpcActorTest` testcase and 
`RpcCompletenessTest` testcase. Sorry for those mistakes, I would take care of 
it next time.
2. About resourceManager, I adopt ResourceManager extends RpcEndpoint at first 
time, but it would fail because of an Exception when I wanna to start a 
subClass of this ResourceManager. For example, public class 
StandaloneResourceManager extends ResourceManager, 
when I start this ResourceManager, it would call AkkaRpcService.#startServer, 
an exception would be thrown here because selfGatewayType was mistake for 
TaskExecutorRegistration class. So I change it to 
ResourceManager<ResourceManagerGateway, WorkerType extends 
TaskExecutorRegistration> extends RpcEndpoint


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...

2016-09-26 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2540#discussion_r80612002
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -66,15 +67,16 @@
  * {@link #requestSlot(SlotRequest)} requests a slot from the 
resource manager
  * 
  */
-public class ResourceManager extends RpcEndpoint 
implements LeaderContender {
+public abstract class ResourceManager<ResourceManagerGateway, WorkerType 
extends TaskExecutorRegistration> extends RpcEndpoint implements 
LeaderContender {
--- End diff --

@mxm , I adopt `ResourceManager
extends RpcEndpoint ` at first time, but it 
would fail because of an Exception when I wanna to start a subClass of this 
ResourceManager. For example, `public class StandaloneResourceManager extends 
ResourceManager`, when I start this ResourceManager, 
it would call AkkaRpcService.#startServer, an exception would be thrown here 
because selfGatewayType was mistake for TaskExecutorRegistration class. So I 
change it to `ResourceManager<ResourceManagerGateway, WorkerType extends 
TaskExecutorRegistration> extends RpcEndpoint`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2524: [FLINK-4653] [Client] Refactor JobClientActor to adapt to...

2016-09-26 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2524
  
@mxm , thanks for your review. I modified the content based on your advice:
1. The changes could compile successfully now, sorry for the level mistake.
2. AwaitJobResult method in JobClientUtils would retry upon 
TimeoutException utils JobInfoTracker seems to be dead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2543: [FLINK-4670] [cluster management] Add watch mechanism on ...

2016-09-23 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2543
  
Hi, till. Thanks for your review.
First sorry to bind to the wrong jira link, already correct it. 
About heartbeatManager, i thought we could separate HeartbeatManager into 
two part, each part focus only on one function.
1. Death watch to monitor liveness, it could register watch, unregister 
watch, and notify the dead targets. just like deathwatch in akka. Although now 
I implement it on Akka's dead watch functionality, but interface is defined. 
It's fine to give another implementation of it in future.
2. Payload deliver periodically. But because data sync behavior is 
different in different component, e.g, TM,JM,RM, so I'm not sure it is good to 
extract the logic into a common logic.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2543: [FLINK-4606] [cluster management] Add watch mechan...

2016-09-23 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2543

[FLINK-4606] [cluster management] Add watch mechanism on current RPC 
framework

This pr aims at adding watch mechanism on current RPC framework, so RPC 
gateway could be watched to make sure the rpc server is running just like 
previous DeathWatch in akka.
There are following main differences:
1. Add watch and unwatch method to RpcEndpoint class
2. Implement WatchOperationHandler in AkkaInvocationHandler, so it could 
handle watch and unwatch operation


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4670

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2543.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2543


commit ed8e1b46b4cb20100fcf161ee9e2b4e4622aaf85
Author: beyond1920 <beyond1...@126.com>
Date:   2016-09-23T00:46:30Z

watch and unwatch mechanism in new rpc framework

Summary: watch and unwatch mechanism in new rpc framework

Test Plan: junit

Reviewers: #blink, kete.yangkt

Differential Revision: http://phabricator.taobao.net/D5849




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2540: [FLINK-4606] [Client] Integrate the new ResourceMa...

2016-09-23 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2540

[FLINK-4606] [Client] Integrate the new ResourceManager with the existed 
FlinkResourceManager

This pr aims  to integrate the new ResourceManager with the existed 
FlinkResourceManager, the main difference including:
1.  Move the useful rpc communication in existed FlinkResourceManager to 
new ResourceManager, e.g : register infoMessageListener, unregister 
infoMessageListener, shutDownCluster
2. Make ResourceManager to be an abstract class, extract framework specific 
behavior 
3. Implement standalone resourceManager based on the new base 
ResourceManager class.
4. Modify testcases which are effected by abstract resourceManager class.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4606

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2540.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2540


commit d0e0cea949a8bd2e94c333b2204d90f6ed9e740d
Author: beyond1920 <beyond1...@126.com>
Date:   2016-09-09T01:11:24Z

yarn slot manager

commit 78a3b44040b73d288f67a7b3491ab6abaf673bdb
Author: beyond1920 <beyond1...@126.com>
Date:   2016-09-10T03:29:40Z

integrate with existing FlinkResourceManager




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2455: [FLINK-4547] [cluster management] when call connect metho...

2016-09-22 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2455
  
@tillrohrmann , I rebase the PR. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-22 Thread beyond1920
Github user beyond1920 closed the pull request at:

https://github.com/apache/flink/pull/2451


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2479: [FLINK-4537] [cluster management] ResourceManager ...

2016-09-22 Thread beyond1920
Github user beyond1920 closed the pull request at:

https://github.com/apache/flink/pull/2479


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2524: [FLINK-4653] [Client] Refactor JobClientActor to adapt to...

2016-09-21 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2524
  
@mxm , I rebase the PR already. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2524: [FLINK-4653] [Client] Refactor JobClientActor to a...

2016-09-21 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2524

[FLINK-4653] [Client] Refactor JobClientActor to adapt to new Rpc framework 
and new cluster managerment

There are following main changes: 
1. Create a RpcEndpoint(temporary named JobInfoTracker) and 
RpcGateway(temporary named JobInfoTrackerGateway) to replace the old 
JobClientActor. 
2. Change rpc message communication in JobClientActor to rpc method call to 
apply to the new rpc framework. 
3. JobInfoTracker is responsible for waiting for the jobStateChange and 
jobResult util job complete. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4653

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2524.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2524


commit 5959e94604142784341ba76fb06cccd32473676b
Author: beyond1920 <beyond1...@126.com>
Date:   2016-09-21T06:00:14Z

jobClient refactor

Summary: jobClient refactor for apply to new cluster management

Test Plan: junit

Reviewers: #blink, kete.yangkt

Differential Revision: http://phabricator.taobao.net/D5816




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-07 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77943543
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

There is a log field in RpcEndpoint, which is protected, why not use that 
instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2479: [FLINK-4537] [cluster management] ResourceManager ...

2016-09-07 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2479

[FLINK-4537] [cluster management] ResourceManager registration with 
JobManager

This pull request is to implement ResourceManager registration with 
JobManager, which including:
1. Check whether input resourceManagerLeaderId is as same as the current 
leadershipSessionId of resourceManager. If not, it means that maybe two or more 
resourceManager exists at the same time, and current resourceManager is not the 
proper rm. so it rejects or ignores the registration.
2. Check whether exists a valid JobMaster at the giving address by 
connecting to the address. Reject the registration from invalid address.(Hidden 
in the connect logic)
3. Keep JobID and JobMasterGateway mapping relationships.
4. Start a JobMasterLeaderListener at the given JobID to listen to the 
leadership of the specified JobMaster.
5. Send registration successful ack to the jobMaster.

Main difference are 6 points:
1. Add getJobMasterLeaderRetriever method to get job master leader 
retriever in HighAvailabilityServices, NonHaServices, A inner class in 
TaskExecutor, TestingHighAvailabilityServices.
2. Change registerJobMaster method logic of ResourceManager based on the 
above step
3. Change the input parameters of registerJobMaster method in 
ResourceManager and ResourceManagerGateway class to be consistent with 
registerTaskExecutor, from jobMasterRegistration to resourceManagerLeaderId + 
jobMasterAddress  + jobID
4. Change the result type of registerJobMaster method in ResourceManager 
and ResourceManagerGateway class to be consistent with RetryingRegistration, 
from org.apache.flink.runtime.resourcemanager.RegistrationResponse to 
org.apache.flink.runtime.registration.RegistrationResponse
5. Add a LeaderRetrievalListener in ResourceManager to listen to leadership 
of jobMaster
6. Add a test class for registerJobMaster method in ResourceManager

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4537

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2479.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2479


commit fa66ac8ae86745dc9daf1fb07c6c96be4f336c90
Author: beyond1920 <beyond1...@126.com>
Date:   2016-09-01T07:27:20Z

rsourceManager registration with JobManager

commit f5e54a21e4a864b5ac5f2f548b6d3dea3edcb619
Author: beyond1920 <beyond1...@126.com>
Date:   2016-09-07T09:53:44Z

Add JobMasterLeaderRetriverListener at ResourceManager




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2455: [FLINK-4547] [cluster management] when call connect metho...

2016-09-07 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2455
  
@tillrohrmann @StephanEwen , thanks for your review. I changed the code 
based on your comment, including two points:
1. Change the JIRA and the PR subject line to better reflect the actual 
changes.
2. Modify the testcase which connect to invalid address in 
AkkaRpcServiceTest.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-07 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77769044
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

There exists 3 following possibilities of the response from taskExecutor:
1. Ack request which means the taskExecutor gives the slot to the specified 
jobMaster as expected. 
2. Decline request if the slot is already occupied by other AllocationID. 
3. Timeout which could caused by lost of request message or response 
message or slow network transfer. 
On the first occasion, SlotManager need to do nothing. However, under the 
second and third occasion, slotManager will verify and clear all the previous 
allocate information for this slot request firstly, then try to find a proper 
slot for the slot request again. I thought we should add logic to handle these 
3 following possibilities of the response from taskExecutor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-07 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77768256
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

ResourceManager keeps a relationship between resourceID and 
TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID 
using ResourceManager here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2451: [FLINK-4535] [cluster management] resourceManager process...

2016-09-06 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2451
  
@tillrohrmann , thanks for your review. I changed the pr based on your 
advice, including :
1. Modify UnmatchedLeaderSessionIDException to LeaderSessionIDException, 
format code style and method comment, and add checkNotNull to constructor
2. Format code style and method comment of ResourceManager
3. Group the TaskExecutorGateway and the InstanceID into a 
TaskExecutorRegistration
4. Modify testcases which are expected to throw exception at 
ResourceManagerTest

Besides, I don't understand this comment of yours: "Should we fail or 
decline the registration here? So either sending an exception or a 
RegistrationResponse.Decline message.". Do you means you prefer to sending a 
Decline message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-06 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2451#discussion_r77757943
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 ---
@@ -138,22 +150,46 @@ public SlotAssignment requestSlot(SlotRequest 
slotRequest) {
 
 
/**
-*
-* @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
+* Register a taskExecutor at the resource manager
+* @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader
 * @param taskExecutorAddress  The address of the TaskExecutor that 
registers
 * @param resourceID   The resource ID of the TaskExecutor 
that registers
 *
 * @return The response by the ResourceManager.
 */
@RpcMethod
-   public org.apache.flink.runtime.rpc.registration.RegistrationResponse 
registerTaskExecutor(
-   UUID resourceManagerLeaderId,
-   String taskExecutorAddress,
-   ResourceID resourceID) {
+   public Future registerTaskExecutor(
+   final UUID resourceManagerLeaderId,
+   final String taskExecutorAddress,
+   final ResourceID resourceID) {
+
+   if(!leaderSessionID.equals(resourceManagerLeaderId)) {
+   log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected leader session ID {} did not equal the received 
leader session ID  {}",
+   resourceID, taskExecutorAddress, 
leaderSessionID, resourceManagerLeaderId);
+   return Futures.failed(new 
UnmatchedLeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
--- End diff --

Do you prefer to send a Decline message under the condition?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] when call connec...

2016-09-06 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2455#discussion_r77754084
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 ---
@@ -189,7 +191,49 @@ public void stop() {
rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
}
 
-   // 

+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+
+   if (o == null) {
+   return false;
+   }
+
+   if(Proxy.isProxyClass(o.getClass())) {
+   return o.equals(this);
+   }
--- End diff --

@tillrohrmann ,  It does not means that AkkaInvocationHandler be a proxy 
class. In fact it means if input parameter class is a proxy class, then return 
o.equals(this) result. Here is my reason.  when call connect method in 
AkkaRpcService, the returned gateway which is wrapped in Future is in fact a 
Proxy. When I call equals method to compare two gateway, the equals method of 
AkkaInvocationHandler will be called. But the input parameter is still another 
Gateway  which class is Proxy class instead of AkkaInvocationHandler. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] Return same obje...

2016-09-06 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2455#discussion_r77753174
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 ---
@@ -189,7 +191,49 @@ public void stop() {
rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
}
 
-   // 

+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+
+   if (o == null) {
+   return false;
+   }
+
+   if(Proxy.isProxyClass(o.getClass())) {
+   return o.equals(this);
+   }
--- End diff --

@StephanEwen , as till said, the subject of this pr is misleading. I means 
When call connect method in AkkaRpcService using same address and same rpc 
gateway class, the returned gateways are equals instead of return same gateway. 
I changed the subject.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] Return same obje...

2016-09-01 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2455

[FLINK-4547] [cluster management] Return same object when call connect 
method in AkkaRpcService using same address and same rpc gateway class

The pull request is to add equals and hashCode method to 
AkkaInvocationHandler class. 
Now every time call connect method in AkkaRpcService class using same 
address and same rpc gateway class, the return gateway object is totally 
different with each other which equals and hashcode are not same. 
Maybe it’s reasonable to have the same result (equals return true, and 
hashcode is same) when using the same address and same Gateway class.

Main difference are 2 points:
1. Add equals and hashCode method to AkkaInvocationHandler class
2. Add a test for connect method in AkkaRpcService


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4547

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2455.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2455


commit 8504f1075b4c4c51f37aab2b45b864f885dc76a3
Author: beyond1920 <beyond1...@126.com>
Date:   2016-09-01T07:00:03Z

add equals and hashcode in AkkaInvocationHandler




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2451: [FLINK-4535] [cluster management] resourceManager ...

2016-09-01 Thread beyond1920
GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2451

[FLINK-4535] [cluster management] resourceManager process the registration 
from TaskExecutor

This pull request is to implement ResourceManager registration with 
TaskExecutor, which including:
1. Check whether input resourceManagerLeaderId is as same as the current 
leadershipSessionId of resourceManager. If not, it means that maybe two or more 
resourceManager exists at the same time, and current resourceManager is not the 
proper rm. so it rejects or ignores the registration.
2. Check whether exists a valid taskExecutor at the giving address by 
connecting to the address. Reject the registration from invalid address. (which 
is hidden in the connect method)
3. Keep resourceID and taskExecutorGateway mapping relationships, And 
optionally keep resourceID and container mapping relationships in yarn mode.
4. Send registration successful ack to the taskExecutor.

Main difference are 3 points:
1. Add UnmatchedLeaderSessionIDException to specify that received leader 
session ID is not as same as expected.
2. Change registerTaskExecutor method  of ResourceManager
3. Add a test class for ResourceManager

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-4535

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2451.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2451


commit fa795ca7a992859398ed30180e50ef036a93b355
Author: beyond1920 <beyond1...@126.com>
Date:   2016-09-01T03:14:00Z

resourceManager process the registration from TaskExecutor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

2016-08-31 Thread beyond1920
Github user beyond1920 closed the pull request at:

https://github.com/apache/flink/pull/2427


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2427: [FLINK-4516] [cluster management]leader election of resou...

2016-08-31 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2427
  
Hi @mxm , thanks so much. I already close the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2427: [FLINK-4516] [cluster management]leader election of resou...

2016-08-31 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2427
  
@mxm , Thanks for your review. And I change the ResourceManagerHATest to 
use a mocked RpcService instead of TestingSerialRpcService.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2435: [FLINK-4478] [flip-6] Add HeartbeatManager

2016-08-31 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2435#discussion_r76938462
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import scala.concurrent.Future;
+
+/**
+ * Interface for the interaction with the {@link HeartbeatManager}. The 
heartbeat listener is used
+ * for the following things:
+ * 
+ * 
+ * Notifications about heartbeat timeouts
+ * Payload reports of incoming heartbeats
+ * Retrieval of payloads for outgoing heartbeats
+ * 
+ * @param  Type of the incoming payload
+ * @param  Type of the outgoing payload
+ */
+public interface HeartbeatListener<I, O> {
+
+   /**
+* Callback which is called if a heartbeat for the machine identified 
by the given resource
+* ID times out.
+*
+* @param resourceID Resource ID of the machine whose heartbeat has 
timed out
+*/
+   void notifyHeartbeatTimeout(ResourceID resourceID);
--- End diff --

It seems that JobManager and ResourceManager don't have ResourceID, only 
TaskExecutor has ResourceID. Would it be more proper to use something else to 
identify the heartbeat target?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >