[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-28 Thread GitBox
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339441722
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 ##
 @@ -73,7 +74,14 @@ abstract class TableEnvImpl(
 new TableReferenceLookup {
   override def lookupTable(name: String): 
Optional[TableReferenceExpression] = {
 JavaScalaConversionUtil
-  .toJava(scanInternal(Array(name)).map(t => new 
TableReferenceExpression(name, t)))
+  .toJava(
+Try({
+  val unresolvedIdentifier = UnresolvedIdentifier.of(name)
 
 Review comment:
   Got it. You can put these comments to code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-28 Thread GitBox
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339436930
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
 ##
 @@ -60,37 +65,98 @@ public ConnectTableDescriptor withSchema(Schema schema) {
 * Searches for the specified table source, configures it accordingly, 
and registers it as
 * a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSource(String name) {
Preconditions.checkNotNull(name);
TableSource tableSource = 
TableFactoryUtil.findAndCreateTableSource(this);
-   tableEnv.registerTableSource(name, tableSource);
+   registration.createTableSource(name, tableSource);
}
 
/**
 * Searches for the specified table sink, configures it accordingly, 
and registers it as
 * a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSink(String name) {
Preconditions.checkNotNull(name);
TableSink tableSink = 
TableFactoryUtil.findAndCreateTableSink(this);
-   tableEnv.registerTableSink(name, tableSink);
+   registration.createTableSink(name, tableSink);
}
 
/**
 * Searches for the specified table source and sink, configures them 
accordingly, and registers
 * them as a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSourceAndSink(String name) {
registerTableSource(name);
registerTableSink(name);
}
 
+   /**
+* Registers the table described by underlying properties in a given 
path.
+*
+* There is no distinction between source and sink at the descriptor 
level anymore as this
+* method does not perform actual class lookup. It only stores the 
underlying properties. The
+* actual source/sink lookup is performed when the table is used.
+*
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
+* NOTE: The schema must be explicitly defined.
+*
+* @param path path where to register the temporary table
+*/
+   public void createTemporaryTable(String path) {
+   if (schemaDescriptor == null) {
+   throw new TableException(
+   "Table schema must be explicitly defined. To 
derive schema from the underlying connector" +
+   " use 
registerTableSource/registerTableSink/registerTableSourceAndSink.");
+   }
+
+   Map schemaProperties = 
schemaDescriptor.toProperties();
+   TableSchema tableSchema = getTableSchema(schemaProperties);
+
+   Map properties = new HashMap<>(toProperties());
+   schemaProperties.keySet().forEach(properties::remove);
+
+   CatalogTableImpl catalogTable = new CatalogTableImpl(
 
 Review comment:
   OK.
   After https://issues.apache.org/jira/browse/FLINK-14381 , we removed 
partition support for temporary table for clean up 
`PartitionableTableSource/Sink`.
   I created JIRA to track it: 
https://issues.apache.org/jira/browse/FLINK-14543 , will fixed it in 1.10.


[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339395831
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -501,6 +537,11 @@ public void alterTable(CatalogBaseTable table, 
ObjectIdentifier objectIdentifier
 *  does not exist.
 */
public void dropTable(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
+   if (temporaryTables.containsKey(objectIdentifier)) {
 
 Review comment:
   Just curious about this, I took a look to spark:
   ```
  * If a database is specified in `name`, this will drop the table from 
that database.
  * If no database is specified, this will first attempt to drop a 
temporary view with
  * the same name, then, if that does not exist, drop the table from the 
current database.
   ```
   Curious about other databases, I don't know how they behave.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339393920
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
 ##
 @@ -60,37 +65,98 @@ public ConnectTableDescriptor withSchema(Schema schema) {
 * Searches for the specified table source, configures it accordingly, 
and registers it as
 * a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSource(String name) {
Preconditions.checkNotNull(name);
TableSource tableSource = 
TableFactoryUtil.findAndCreateTableSource(this);
-   tableEnv.registerTableSource(name, tableSource);
+   registration.createTableSource(name, tableSource);
}
 
/**
 * Searches for the specified table sink, configures it accordingly, 
and registers it as
 * a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSink(String name) {
Preconditions.checkNotNull(name);
TableSink tableSink = 
TableFactoryUtil.findAndCreateTableSink(this);
-   tableEnv.registerTableSink(name, tableSink);
+   registration.createTableSink(name, tableSink);
}
 
/**
 * Searches for the specified table source and sink, configures them 
accordingly, and registers
 * them as a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSourceAndSink(String name) {
registerTableSource(name);
registerTableSink(name);
}
 
+   /**
+* Registers the table described by underlying properties in a given 
path.
+*
+* There is no distinction between source and sink at the descriptor 
level anymore as this
+* method does not perform actual class lookup. It only stores the 
underlying properties. The
+* actual source/sink lookup is performed when the table is used.
+*
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
+* NOTE: The schema must be explicitly defined.
+*
+* @param path path where to register the temporary table
+*/
+   public void createTemporaryTable(String path) {
+   if (schemaDescriptor == null) {
+   throw new TableException(
+   "Table schema must be explicitly defined. To 
derive schema from the underlying connector" +
+   " use 
registerTableSource/registerTableSink/registerTableSourceAndSink.");
+   }
+
+   Map schemaProperties = 
schemaDescriptor.toProperties();
+   TableSchema tableSchema = getTableSchema(schemaProperties);
+
+   Map properties = new HashMap<>(toProperties());
+   schemaProperties.keySet().forEach(properties::remove);
+
+   CatalogTableImpl catalogTable = new CatalogTableImpl(
 
 Review comment:
   `CatalogTable` has `partitionKeys` too, consider add partition keys in 
`CatalogTableImpl.toProperties` and parse partition keys here?
   (We can add later too)


This is an automated message from the Apache Git Service.

[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339392415
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 ##
 @@ -73,7 +74,14 @@ abstract class TableEnvImpl(
 new TableReferenceLookup {
   override def lookupTable(name: String): 
Optional[TableReferenceExpression] = {
 JavaScalaConversionUtil
-  .toJava(scanInternal(Array(name)).map(t => new 
TableReferenceExpression(name, t)))
+  .toJava(
+Try({
+  val unresolvedIdentifier = UnresolvedIdentifier.of(name)
 
 Review comment:
   Why use `Try`? If it is an illegal name, should we throw exception?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339392857
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 ##
 @@ -363,15 +371,26 @@ abstract class TableEnvImpl(
 
   @throws[TableException]
   override def scan(tablePath: String*): Table = {
-scanInternal(tablePath.toArray) match {
+val unresolvedIdentifier = UnresolvedIdentifier.of(tablePath: _*)
+scanInternal(unresolvedIdentifier) match {
   case Some(table) => createTable(table)
   case None => throw new TableException(s"Table 
'${tablePath.mkString(".")}' was not found.")
 }
   }
 
-  private[flink] def scanInternal(tablePath: Array[String]): 
Option[CatalogQueryOperation] = {
-val unresolvedIdentifier = UnresolvedIdentifier.of(tablePath: _*)
-val objectIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier)
+  override def from(path: String): Table = {
+val parser = planningConfigurationBuilder.createCalciteParser()
+val unresolvedIdentifier = 
UnresolvedIdentifier.of(parser.parseIdentifier(path).names: _*)
+scanInternal(unresolvedIdentifier) match {
+  case Some(table) => createTable(table)
+  case None => throw new TableException(s"Table '$path' was not found.")
 
 Review comment:
   Can we have a unify exception message? Use `unresolvedIdentifier`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] Add methods for interacting with temporary objects

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339393982
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
 ##
 @@ -60,37 +65,98 @@ public ConnectTableDescriptor withSchema(Schema schema) {
 * Searches for the specified table source, configures it accordingly, 
and registers it as
 * a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSource(String name) {
Preconditions.checkNotNull(name);
TableSource tableSource = 
TableFactoryUtil.findAndCreateTableSource(this);
-   tableEnv.registerTableSource(name, tableSource);
+   registration.createTableSource(name, tableSource);
}
 
/**
 * Searches for the specified table sink, configures it accordingly, 
and registers it as
 * a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSink(String name) {
Preconditions.checkNotNull(name);
TableSink tableSink = 
TableFactoryUtil.findAndCreateTableSink(this);
-   tableEnv.registerTableSink(name, tableSink);
+   registration.createTableSink(name, tableSink);
}
 
/**
 * Searches for the specified table source and sink, configures them 
accordingly, and registers
 * them as a table under the given name.
 *
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
 * @param name table name to be registered in the table environment
+* @deprecated use {@link #createTemporaryTable(String)}
 */
+   @Deprecated
public void registerTableSourceAndSink(String name) {
registerTableSource(name);
registerTableSink(name);
}
 
+   /**
+* Registers the table described by underlying properties in a given 
path.
+*
+* There is no distinction between source and sink at the descriptor 
level anymore as this
+* method does not perform actual class lookup. It only stores the 
underlying properties. The
+* actual source/sink lookup is performed when the table is used.
+*
+* Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+* be inaccessible in the current session. To make the permanent object 
available again you can drop the
+* corresponding temporary object.
+*
+* NOTE: The schema must be explicitly defined.
+*
+* @param path path where to register the temporary table
+*/
+   public void createTemporaryTable(String path) {
+   if (schemaDescriptor == null) {
+   throw new TableException(
+   "Table schema must be explicitly defined. To 
derive schema from the underlying connector" +
+   " use 
registerTableSource/registerTableSink/registerTableSourceAndSink.");
+   }
+
+   Map schemaProperties = 
schemaDescriptor.toProperties();
+   TableSchema tableSchema = getTableSchema(schemaProperties);
+
+   Map properties = new HashMap<>(toProperties());
+   schemaProperties.keySet().forEach(properties::remove);
+
+   CatalogTableImpl catalogTable = new CatalogTableImpl(
 
 Review comment:
   Maybe we can use `CatalogTableBuilder` here?
   Looks like there some bugs in `CatalogTableBuilder`, it not remove 
`schemaProperties` in `properties`?


This is an automated message from the Apache Git Service.
To