PHILO-HE commented on code in PR #11300:
URL: 
https://github.com/apache/incubator-gluten/pull/11300#discussion_r2644701600


##########
gluten-flink/planner/src/main/java/org/apache/gluten/velox/FileSystemSinkFactory.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenOneInputOperatorFactory;
+import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
+import io.github.zhztheplayer.velox4j.connector.FileSystemInsertTableHandle;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableWriteNode;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class FileSystemSinkFactory implements VeloxSourceSinkFactory {
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean match(Transformation<RowData> transformation) {
+    if (transformation instanceof SinkTransformation) {
+      SinkTransformation<RowData, RowData> sinkTransformation =
+          (SinkTransformation<RowData, RowData>) transformation;
+      Transformation<RowData> inputTransformation =
+          (Transformation<RowData>) sinkTransformation.getInputs().get(0);
+      if (inputTransformation instanceof OneInputTransformation
+          && inputTransformation.getName().equals("PartitionCommitter")) {
+        OneInputTransformation<RowData, RowData> oneInputTransformatin =
+            (OneInputTransformation<RowData, RowData>) inputTransformation;
+        Transformation<RowData> preInputTransformation =
+            (Transformation<RowData>) oneInputTransformatin.getInputs().get(0);
+        return preInputTransformation.getName().equals("StreamingFileWriter");
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public Transformation<RowData> buildVeloxSource(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    throw new UnsupportedOperationException("Unimplemented method 
'buildVeloxSource'");
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @Override
+  public Transformation<RowData> buildVeloxSink(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    SinkTransformation<RowData, RowData> sinkTransformation =
+        (SinkTransformation<RowData, RowData>) transformation;
+    OneInputTransformation<RowData, RowData> partitionCommitTransformation =
+        (OneInputTransformation<RowData, RowData>) 
sinkTransformation.getInputs().get(0);
+    OneInputTransformation<RowData, RowData> fileWriterTransformation =
+        (OneInputTransformation<RowData, RowData>) 
partitionCommitTransformation.getInputs().get(0);
+    OneInputStreamOperator<?, ?> operator = 
fileWriterTransformation.getOperator();
+    List<String> partitionKeys =
+        (List<String>) ReflectUtils.getObjectField(operator.getClass(), 
operator, "partitionKeys");
+    Map<String, String> tableParams = new HashMap<>();
+    try {
+      Class<?> partitionCommitterClazz =
+          
Class.forName("org.apache.flink.connector.file.table.stream.PartitionCommitter");

Review Comment:
   Nit. I assume try/catch is for this code. Maybe, it would be better to add a 
helper function in ReflectUtils, and remove try/catch on caller side.



##########
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -207,16 +207,23 @@ protected Transformation<Object> createSinkTransformation(
     if (targetRowKind.isPresent()) {
       sinkTransform = applyRowKindSetter(sinkTransform, targetRowKind.get(), 
config);
     }
-
-    return (Transformation<Object>)
-        applySinkProvider(
-            sinkTransform,
-            streamExecEnv,
-            runtimeProvider,
-            rowtimeFieldIndex,
-            sinkParallelism,
-            config,
-            classLoader);
+    Transformation<Object> transformation =

Review Comment:
   Nit: let's add the following comments as hints for Gluten-specific changes 
in Flink source code. Following this convention can make code maintenance 
easier. 
   ```
   // --- Begin Gluten-specific code changes ---
   // --- End Gluten-specific code changes ---
   ```



##########
gluten-flink/planner/src/main/resources/META-INF/services/org.apache.gluten.velox.VeloxSourceSinkFactory:
##########
@@ -2,3 +2,5 @@ org.apache.gluten.velox.FromElementsSourceFactory
 org.apache.gluten.velox.KafkaSourceSinkFactory
 org.apache.gluten.velox.PrintSinkFactory
 org.apache.gluten.velox.NexmarkSourceFactory
+org.apache.gluten.velox.FileSystemSinkFactory
+org.apache.gluten.velox.FuzzerSourceSinkFactory

Review Comment:
   Nit: add an empty line at the end.



##########
gluten-flink/planner/src/main/java/org/apache/gluten/velox/FuzzerSourceSinkFactory.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenOneInputOperatorFactory;
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
+import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+
+import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
+import io.github.zhztheplayer.velox4j.connector.DiscardDataTableHandle;
+import io.github.zhztheplayer.velox4j.connector.FuzzerConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.FuzzerTableHandle;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.plan.TableWriteNode;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.dag.Transformation;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public class FuzzerSourceSinkFactory implements VeloxSourceSinkFactory {
+
+  @SuppressWarnings({"unchecked"})
+  @Override
+  public boolean match(Transformation<RowData> transformation) {
+    if (transformation instanceof SinkTransformation) {
+      SinkTransformation<RowData, RowData> sinkTransformation =
+          (SinkTransformation<RowData, RowData>) transformation;
+      Transformation<?> inputTransformation = 
sinkTransformation.getInputs().get(0);
+      if (sinkTransformation.getSink() instanceof DiscardingSink
+          && !inputTransformation.getName().equals("PartitionCommitter")) {
+        return true;
+      }
+    } else if (transformation instanceof LegacySourceTransformation) {
+      Function userFunction =
+          ((LegacySourceTransformation<RowData>) 
transformation).getOperator().getUserFunction();
+      return userFunction instanceof DataGeneratorSource;
+    }
+    return false;
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public Transformation<RowData> buildVeloxSource(
+      Transformation<RowData> transformation, Map<String, Object> parameters) {
+    LegacySourceTransformation<RowData> sourceTransformation =
+        (LegacySourceTransformation<RowData>) transformation;
+    RowType outputType =
+        (RowType)
+            LogicalTypeConverter.toVLType(
+                ((InternalTypeInfo) 
sourceTransformation.getOutputType()).toLogicalType());
+    String id = PlanNodeIdGenerator.newId();
+    PlanNode tableScan =
+        new TableScanNode(
+            id, outputType, new FuzzerTableHandle("connector-fuzzer", 12367), 
List.of());

Review Comment:
   Could you clarify 12367? Maybe, should add a comment to explain if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to