xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let 
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227920287
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
 ##########
 @@ -120,4 +122,122 @@ class InMemoryExternalCatalog(name: String) extends 
CrudExternalCatalog {
   override def listSubCatalogs(): JList[String] = synchronized {
     databases.keys.toList.asJava
   }
+
+  /**
+    * Adds partition into an external Catalog table
+    *
+    * @param tableName      table name
+    * @param partition      partition description of partition which to create
+    * @param ignoreIfExists if partition already exists in the catalog, not 
throw exception and
+    *                       leave the existed partition if ignoreIfExists is 
true;
+    *                       else throw PartitionAlreadyExistException
+    * @throws TableNotExistException         if table does not exist in the 
catalog yet
+    * @throws PartitionAlreadyExistException if partition exists in the 
catalog and
+    *                                        ignoreIfExists is false
+    */
+  override def createPartition(
+    tableName: String,
+    partition: ExternalCatalogPartition,
+    ignoreIfExists: Boolean): Unit = synchronized {
+    val newPartSpec = partition.partitionSpec
+    val table = getTable(tableName)
+    val partitions = getPartitions(tableName, table)
+    if (partitions.contains(newPartSpec)) {
+      if (!ignoreIfExists) {
+        throw new PartitionAlreadyExistException(name, tableName, newPartSpec)
+      }
+    } else {
+      partitions.put(newPartSpec, partition)
+    }
+  }
+
+  private def getPartitions(tableName: String, table: ExternalCatalogTable)
+  : mutable.HashMap[JLinkedHashMap[String, String], ExternalCatalogPartition] 
= table match {
+    case t: ExternalCatalogPartitionedTable =>
+      partitions.getOrElseUpdate(
+        tableName, new mutable.HashMap[JLinkedHashMap[String, String], 
ExternalCatalogPartition])
+    case _ => throw new UnsupportedOperationException(
 
 Review comment:
   It might be better to define an exception class called TableNotPartitioned 
similar to TableNotExistException, so this exception can be handled explicitly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to