[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.

2018-10-24 Thread GitBox
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let 
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227924642
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
 ##
 @@ -120,4 +122,122 @@ class InMemoryExternalCatalog(name: String) extends 
CrudExternalCatalog {
   override def listSubCatalogs(): JList[String] = synchronized {
 databases.keys.toList.asJava
   }
+
+  /**
+* Adds partition into an external Catalog table
+*
+* @param tableName  table name
+* @param partition  partition description of partition which to create
+* @param ignoreIfExists if partition already exists in the catalog, not 
throw exception and
+*   leave the existed partition if ignoreIfExists is 
true;
+*   else throw PartitionAlreadyExistException
+* @throws TableNotExistException if table does not exist in the 
catalog yet
+* @throws PartitionAlreadyExistException if partition exists in the 
catalog and
+*ignoreIfExists is false
+*/
+  override def createPartition(
+tableName: String,
+partition: ExternalCatalogPartition,
+ignoreIfExists: Boolean): Unit = synchronized {
+val newPartSpec = partition.partitionSpec
+val table = getTable(tableName)
+val partitions = getPartitions(tableName, table)
+if (partitions.contains(newPartSpec)) {
+  if (!ignoreIfExists) {
+throw new PartitionAlreadyExistException(name, tableName, newPartSpec)
+  }
+} else {
+  partitions.put(newPartSpec, partition)
 
 Review comment:
   We might want to validate if the partitionSpec is what the table is 
expecting. Otherwise, we might insert some partition who's partition columns 
are different from how the table is partitioned.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.

2018-10-24 Thread GitBox
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let 
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227920287
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
 ##
 @@ -120,4 +122,122 @@ class InMemoryExternalCatalog(name: String) extends 
CrudExternalCatalog {
   override def listSubCatalogs(): JList[String] = synchronized {
 databases.keys.toList.asJava
   }
+
+  /**
+* Adds partition into an external Catalog table
+*
+* @param tableName  table name
+* @param partition  partition description of partition which to create
+* @param ignoreIfExists if partition already exists in the catalog, not 
throw exception and
+*   leave the existed partition if ignoreIfExists is 
true;
+*   else throw PartitionAlreadyExistException
+* @throws TableNotExistException if table does not exist in the 
catalog yet
+* @throws PartitionAlreadyExistException if partition exists in the 
catalog and
+*ignoreIfExists is false
+*/
+  override def createPartition(
+tableName: String,
+partition: ExternalCatalogPartition,
+ignoreIfExists: Boolean): Unit = synchronized {
+val newPartSpec = partition.partitionSpec
+val table = getTable(tableName)
+val partitions = getPartitions(tableName, table)
+if (partitions.contains(newPartSpec)) {
+  if (!ignoreIfExists) {
+throw new PartitionAlreadyExistException(name, tableName, newPartSpec)
+  }
+} else {
+  partitions.put(newPartSpec, partition)
+}
+  }
+
+  private def getPartitions(tableName: String, table: ExternalCatalogTable)
+  : mutable.HashMap[JLinkedHashMap[String, String], ExternalCatalogPartition] 
= table match {
+case t: ExternalCatalogPartitionedTable =>
+  partitions.getOrElseUpdate(
+tableName, new mutable.HashMap[JLinkedHashMap[String, String], 
ExternalCatalogPartition])
+case _ => throw new UnsupportedOperationException(
 
 Review comment:
   It might be better to define an exception class called TableNotPartitioned 
similar to TableNotExistException, so this exception can be handled explicitly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.

2018-10-24 Thread GitBox
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let 
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227915200
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ##
 @@ -282,34 +296,63 @@ class ExternalCatalogTableBuilder(private val 
connectorDescriptor: ConnectorDesc
 this
   }
 
+  /**
+* Specifies the partition columns for this external table.
+*/
+  def withPartitionColumnNames(
+partitionColumnNames: java.util.LinkedHashSet[String]): 
ExternalCatalogTableBuilder = {
+require(partitionColumnNames != null && !partitionColumnNames.isEmpty)
+this.partitionColumnNames = Some(partitionColumnNames)
+this
+  }
+
   /**
 * Declares this external table as a table source and returns the
 * configured [[ExternalCatalogTable]].
 *
 * @return External catalog table
 */
-  def asTableSource(): ExternalCatalogTable = {
-new ExternalCatalogTable(
-  isBatch,
-  isStreaming,
-  isSource = true,
-  isSink = false,
-  DescriptorProperties.toJavaMap(this))
-  }
+  def asTableSource(): ExternalCatalogTable = this.partitionColumnNames match {
+  case Some(pc) =>
+new ExternalCatalogPartitionedTable(
+  isBatch,
+  isStreaming,
+  isSource = true,
+  isSink = false,
+  pc,
+  DescriptorProperties.toJavaMap(this)
+)
+  case None =>
+new ExternalCatalogTable(
+  isBatch,
+  isStreaming,
+  isSource = true,
+  isSink = false,
+  DescriptorProperties.toJavaMap(this))
+   }
 
   /**
 * Declares this external table as a table sink and returns the
 * configured [[ExternalCatalogTable]].
 *
 * @return External catalog table
 */
-  def asTableSink(): ExternalCatalogTable = {
-new ExternalCatalogTable(
-  isBatch,
-  isStreaming,
-  isSource = false,
-  isSink = true,
-  DescriptorProperties.toJavaMap(this))
+  def asTableSink(): ExternalCatalogTable = this.partitionColumnNames match {
 
 Review comment:
   I see a repeated pattern in the three asXXX methods. While it's not 
introduced in this PR, it might be good if we can introduce a help method that 
those asXXX methods call to minimize the repetition.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.

2018-10-24 Thread GitBox
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let 
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227908967
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
 ##
 @@ -31,6 +31,32 @@ import org.apache.flink.table.api._
   */
 trait ExternalCatalog {
 
+  /**
+* Gets the partition from external Catalog
+*
+* @param tableName table name
+* @param partSpec  partition specification
+* @throws TableNotExistException if table does not exist in the 
catalog yet
+* @throws PartitionNotExistException if partition does not exist in the 
catalog yet
+* @return found partition
+*/
+  @throws[TableNotExistException]
+  @throws[PartitionNotExistException]
+  def getPartition(
+tableName: String,
+partSpec: JLinkedHashMap[String, String]): ExternalCatalogPartition
+
+  /**
+* Gets the partition specification list of a table from external catalog
+*
+* @param tableName table name
+* @throws CatalogNotExistException  if database does not exist in the 
catalog yet
 
 Review comment:
   This (CatalogNotExistException) seems not needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition.

2018-10-24 Thread GitBox
xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let 
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227908967
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
 ##
 @@ -31,6 +31,32 @@ import org.apache.flink.table.api._
   */
 trait ExternalCatalog {
 
+  /**
+* Gets the partition from external Catalog
+*
+* @param tableName table name
+* @param partSpec  partition specification
+* @throws TableNotExistException if table does not exist in the 
catalog yet
+* @throws PartitionNotExistException if partition does not exist in the 
catalog yet
+* @return found partition
+*/
+  @throws[TableNotExistException]
+  @throws[PartitionNotExistException]
+  def getPartition(
+tableName: String,
+partSpec: JLinkedHashMap[String, String]): ExternalCatalogPartition
+
+  /**
+* Gets the partition specification list of a table from external catalog
+*
+* @param tableName table name
+* @throws CatalogNotExistException  if database does not exist in the 
catalog yet
 
 Review comment:
   This seems not needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services