Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-11 Thread via GitHub


lsyldliu merged PR #24760:
URL: https://github.com/apache/flink/pull/24760


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-11 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597382234


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##
@@ -236,6 +252,213 @@ void testCreateMaterializedTableInFullMode() {
 "Only support create materialized table in continuous 
refresh mode currently.");
 }
 
+@Test
+void testAlterMaterializedTableRefresh() throws Exception {
+long timeout = Duration.ofSeconds(20).toMillis();
+long pause = Duration.ofSeconds(2).toMillis();
+// initialize session handle, create test-filesystem catalog and 
register it to catalog
+// store
+SessionHandle sessionHandle = initializeSession();
+
+List data = new ArrayList<>();
+data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
+data.add(Row.of(2L, 2L, 2L, "2024-01-02"));
+data.add(Row.of(3L, 3L, 3L, "2024-01-02"));
+String dataId = TestValuesTableFactory.registerData(data);
+
+String sourceDdl =
+String.format(
+"CREATE TABLE my_source (\n"
++ "  order_id BIGINT,\n"
++ "  user_id BIGINT,\n"
++ "  shop_id BIGINT,\n"
++ "  order_created_at STRING\n"
++ ")\n"
++ "WITH (\n"
++ "  'connector' = 'values',\n"
++ "  'bounded' = 'true',\n"
++ "  'data-id' = '%s'\n"
++ ")",
+dataId);
+service.executeStatement(sessionHandle, sourceDdl, -1, new 
Configuration());
+
+String materializedTableDDL =
+"CREATE MATERIALIZED TABLE my_materialized_table"
++ " PARTITIONED BY (ds)\n"
++ " WITH(\n"
++ "   'format' = 'debezium-json'\n"
++ " )\n"
++ " FRESHNESS = INTERVAL '2' SECOND\n"
++ " AS SELECT \n"
++ "  user_id,\n"
++ "  shop_id,\n"
++ "  ds,\n"
++ "  COUNT(order_id) AS order_cnt\n"
++ " FROM (\n"
++ "SELECT user_id, shop_id, order_created_at AS 
ds, order_id FROM my_source"
++ " ) AS tmp\n"
++ " GROUP BY (user_id, shop_id, ds)";
+
+OperationHandle materializedTableHandle =
+service.executeStatement(
+sessionHandle, materializedTableDDL, -1, new 
Configuration());
+awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+// verify data exists in materialized table
+CommonTestUtils.waitUtil(
+() ->
+fetchTableData(sessionHandle, "SELECT * FROM 
my_materialized_table").size()
+== data.size(),
+Duration.ofMillis(timeout),
+Duration.ofMillis(pause),
+"Failed to verify the data in materialized table.");
+assertThat(
+fetchTableData(
+sessionHandle,
+"SELECT * FROM my_materialized_table 
where ds = '2024-01-02'")
+.size())
+.isEqualTo(2);
+
+// remove the last element
+data.remove(2);
+
+long currentTime = System.currentTimeMillis();
+String alterStatement =
+"ALTER MATERIALIZED TABLE my_materialized_table REFRESH 
PARTITION (ds = '2024-01-02')";
+OperationHandle alterHandle =
+service.executeStatement(sessionHandle, alterStatement, -1, 
new Configuration());
+awaitOperationTermination(service, sessionHandle, alterHandle);
+List result = fetchAllResults(service, sessionHandle, 
alterHandle);
+assertThat(result.size()).isEqualTo(1);
+String jobId = result.get(0).getString(0).toString();
+
+MiniCluster miniCluster = MINI_CLUSTER.getMiniCluster();

Review Comment:
   Got it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-11 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597376170


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+.contains(LogicalTypeFamil

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+.contains(LogicalTypeFamil

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+.contains(LogicalTypeFamil

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+.contains(LogicalTypeFamil

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+.contains(LogicalTypeFamil

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


lsyldliu commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597329094


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+.contains(LogicalTypeFamily.C

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


lsyldliu commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597326389


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");

Review Comment:
   I think the WHERE clause could be better formatted on a new line



##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableSt

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


xuyangzhong commented on PR #24760:
URL: https://github.com/apache/flink/pull/24760#issuecomment-2104649932

   Hi, @lsyldliu all comments have been addressed. Can you take a look again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1596779798


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##
@@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() {
 "Only support create materialized table in continuous 
refresh mode currently.");
 }
 
+@Test
+void testAlterMaterializedTableRefresh() throws Exception {
+// initialize session handle, create test-filesystem catalog and 
register it to catalog
+// store
+SessionHandle sessionHandle = initializeSession();
+
+String materializedTableDDL =
+"CREATE MATERIALIZED TABLE users_shops"
++ " PARTITIONED BY (ds)\n"
++ " WITH(\n"
++ "   'format' = 'debezium-json'\n"
++ " )\n"
++ " FRESHNESS = INTERVAL '30' SECOND\n"
++ " AS SELECT \n"
++ "  user_id,\n"
++ "  shop_id,\n"
++ "  ds,\n"

Review Comment:
   use values source to check data overwrite



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-09 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1594070565


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+throw new TableException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+Set allPartitionKeys =
+new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+Set unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+unknownPartitionKeys.removeAll(allPartitionKeys);
+if (!unknownPartitionKeys.isEmpty()) {
+throw new TableException(
+String.format(
+"The partition spec contains unknown partition 
keys: %s.",
+unknownPartitionKeys));
+}
+
+// Set job name, runtime mode, checkpoint interval
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT INTO %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.orElseThrow(() -> new TableException("Could not 
happen")));
+}
+
+try {
+// return jobId for one time refresh, user should get the refresh 
job info via desc

Review Comment:
   I want to note the developer the information about the syntax "alter 
materialized table ... refresh ..." returns before. I'll delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-09 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1595409695


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+throw new TableException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+Set allPartitionKeys =
+new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+Set unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+unknownPartitionKeys.removeAll(allPartitionKeys);
+if (!unknownPartitionKeys.isEmpty()) {

Review Comment:
   Choose 1. We can only support string type at first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-08 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1594070565


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+throw new TableException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+Set allPartitionKeys =
+new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+Set unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+unknownPartitionKeys.removeAll(allPartitionKeys);
+if (!unknownPartitionKeys.isEmpty()) {
+throw new TableException(
+String.format(
+"The partition spec contains unknown partition 
keys: %s.",
+unknownPartitionKeys));
+}
+
+// Set job name, runtime mode, checkpoint interval
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT INTO %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.orElseThrow(() -> new TableException("Could not 
happen")));
+}
+
+try {
+// return jobId for one time refresh, user should get the refresh 
job info via desc

Review Comment:
   I want to notice the developer the information about the syntax "alter 
materialized table ... refresh ..." returns before. I'll delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-08 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1594064792


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+throw new TableException(

Review Comment:
   Got it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-08 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1594018523


##
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java:
##
@@ -58,4 +58,9 @@ public void unparse(SqlWriter writer, int leftPrec, int 
rightPrec) {
 writer, getOperator().getLeftPrec(), 
getOperator().getRightPrec());
 }
 }
+
+@Nullable

Review Comment:
   I just notice that in construct, `partitionSpec` is tagged `@Nullable`. I'll 
update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org