JingsongLi commented on code in PR #540:
URL: https://github.com/apache/flink-table-store/pull/540#discussion_r1108235271


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/InsertChangesAction.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.store.connector.action.Action.getTablePath;
+
+/**
+ * Insert changes from given query to a table. The run() method works in 
following steps:
+ *
+ * <ul>
+ *   <li>Get {@link Table} queriedTable from given query.
+ *   <li>Transform the queriedTable to changelog {@link DataStream} with 
internal {@link RowData}.
+ *       The row kind is extracted from the first column, and the fields is 
from left fields of the
+ *       original row.
+ *   <li>Build a sink from the data stream and the target table then execute.
+ * </ul>
+ */
+public class InsertChangesAction extends ActionBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(InsertChangesAction.class);
+
+    private final String query;
+
+    InsertChangesAction(String warehouse, String database, String tableName, 
String query) {
+        super(warehouse, database, tableName);
+        if (((FileStoreTable) table).schema().primaryKeys().isEmpty()) {
+            throw new UnsupportedOperationException(
+                    "insert-changes action doesn't support table with no 
primary keys defined.");
+        }
+        this.query = query;
+    }
+
+    public static Optional<Action> create(String[] args) {
+        LOG.info("insert-changes job args: {}", String.join(" ", args));
+
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+        if (params.has("help")) {
+            printHelp();
+            return Optional.empty();
+        }
+
+        Tuple3<String, String, String> tablePath = getTablePath(params);
+        if (tablePath == null) {
+            return Optional.empty();
+        }
+
+        String query = params.get("query");
+        if (query == null) {
+            return Optional.empty();
+        }
+
+        InsertChangesAction action =
+                new InsertChangesAction(tablePath.f0, tablePath.f1, 
tablePath.f2, query);
+
+        return Optional.of(action);
+    }
+
+    private static void printHelp() {
+        System.out.println("Action \"insert-changes\" apply changes from given 
query to a table.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  insert-changes --warehouse <warehouse-path> --database 
<database-name> "
+                        + "--table <table-name> --query <query-statement>");
+        System.out.println("  insert-changes --path <table-path> --query 
<query-statement>");
+        System.out.println();
+
+        System.out.println("Note: ");
+        System.out.println(
+                "  1. the result's first column of the <query-statement> must 
be RowKind:");
+        System.out.println("    +I: newly added row");
+        System.out.println("    +U: updated row with new content");
+        System.out.println("    -D: deleted row");
+        System.out.println(
+                "  2. make sure the schema of query result after discarding 
the first column is the same as the <table-name>");
+
+        System.out.println();
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  insert-changes --warehouse hdfs:///path/to/warehouse 
--database test_db --table test_table --query SELECT '-D', k, v FROM 
test_table");
+        System.out.println(
+                "  It will insert all records from test_table with RowKind 
'-D' to test_table");
+    }
+
+    @Override
+    public void run() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv =
+                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+
+        registerAndUseCatalog(tEnv);
+
+        Table queriedTable = tEnv.sqlQuery(query);
+
+        List<DataStructureConverter<Object, Object>> converters =
+                queriedTable.getResolvedSchema().getColumnDataTypes().stream()
+                        .map(DataStructureConverters::getConverter)
+                        .collect(Collectors.toList());
+
+        DataStream<RowData> dataStream =
+                tEnv.toChangelogStream(queriedTable)
+                        .map(
+                                row -> {
+                                    RowKind kind;
+                                    try {
+                                        String shortString = row.getFieldAs(0);
+                                        switch (shortString) {
+                                            case "+I":
+                                                kind = RowKind.INSERT;
+                                                break;
+                                            case "+U":
+                                                kind = RowKind.UPDATE_AFTER;
+                                                break;
+                                            case "-D":
+                                                kind = RowKind.DELETE;
+                                                break;
+                                            default:
+                                                throw new 
UnsupportedOperationException(
+                                                        String.format(
+                                                                "Row kind '%s' 
of row '%s' is not supported in insert-changes action.",
+                                                                shortString, 
row));
+                                        }
+                                    } catch (Throwable t) {
+                                        LOG.error(
+                                                String.format(
+                                                        "Wrong schema of row 
'%s' in insert-changes action.",
+                                                        row),
+                                                t);
+                                        throw new RuntimeException(
+                                                String.format(
+                                                        "Wrong schema of row 
'%s' in insert-changes action.",
+                                                        row),
+                                                t);
+                                    }
+
+                                    int arity = row.getArity();
+                                    GenericRowData rowData = new 
GenericRowData(kind, arity);
+                                    for (int i = 1; i < arity; i++) {
+                                        rowData.setField(
+                                                i - 1,
+                                                converters
+                                                        .get(i)
+                                                        
.toInternalOrNull(row.getField(i)));
+                                    }
+                                    return rowData;
+                                });
+
+        new FlinkSinkBuilder((FileStoreTable) 
table).withInput(dataStream).build();

Review Comment:
   `withLockFactory()` from catalog lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to