PHILO-HE commented on code in PR #9446:
URL: https://github.com/apache/incubator-gluten/pull/9446#discussion_r2066936224
##########
gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java:
##########
@@ -89,61 +90,76 @@ private JobGraph mergeGlutenOperators(JobGraph jobGraph) {
for (JobVertex vertex : jobGraph.getVertices()) {
StreamConfig streamConfig = new
StreamConfig(vertex.getConfiguration());
buildGlutenChains(streamConfig);
+ LOG.debug("Vertex {} is {}.", vertex.getName(), streamConfig);
}
return jobGraph;
}
private void buildGlutenChains(
- StreamConfig taskConfig) {
- // TODO: only support head operator now.
- if (isGlutenOperator(taskConfig)) {
- while (true) {
- List<StreamEdge> outEdges =
taskConfig.getChainedOutputs(userClassloader);
- if (outEdges.size() != 1) {
- // only support operators have one output.
- break;
+ StreamConfig vertexConfig) {
+ Map<Integer, StreamConfig> serializedTasks =
+ vertexConfig.getTransitiveChainedTaskConfigs(userClassloader);
+ Map<Integer, StreamConfig> chainedTasks = new
HashMap<>(serializedTasks.size());
+ serializedTasks.forEach(
+ (id, config) ->
+ chainedTasks.put(id, new
StreamConfig(config.getConfiguration()))
+ );
+ StreamConfig taskConfig = vertexConfig;
+ while (true) {
+ List<StreamEdge> outEdges =
taskConfig.getChainedOutputs(userClassloader);
+ if (outEdges == null || outEdges.isEmpty()) {
+ // only support operators have one output.
+ break;
+ } else if (outEdges.size() > 1) {
Review Comment:
Nit: seems this check can be removed and combined with the above into just
one `if` check:
```java
if (outEdges == null || outEdges.size() != 1) {
break;
}
```
##########
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+import
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.Type;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.table.runtime.operators.GlutenSingleInputOperator;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Stream {@link ExecNode} which generates watermark based on the input
elements. */
+@ExecNodeMetadata(
+ name = "stream-exec-watermark-assigner",
+ version = 1,
+ producedTransformations =
StreamExecWatermarkAssigner.WATERMARK_ASSIGNER_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v1_15,
+ minStateVersion = FlinkVersion.v1_15)
+public class StreamExecWatermarkAssigner extends ExecNodeBase<RowData>
+ implements StreamExecNode<RowData>,
SingleTransformationTranslator<RowData> {
+
+ public static final String WATERMARK_ASSIGNER_TRANSFORMATION =
"watermark-assigner";
+
+ public static final String FIELD_NAME_WATERMARK_EXPR = "watermarkExpr";
+ public static final String FIELD_NAME_ROWTIME_FIELD_INDEX =
"rowtimeFieldIndex";
+
+ @JsonProperty(FIELD_NAME_WATERMARK_EXPR)
+ private final RexNode watermarkExpr;
+
+ @JsonProperty(FIELD_NAME_ROWTIME_FIELD_INDEX)
+ private final int rowtimeFieldIndex;
+
+ public StreamExecWatermarkAssigner(
+ ReadableConfig tableConfig,
+ RexNode watermarkExpr,
+ int rowtimeFieldIndex,
+ InputProperty inputProperty,
+ RowType outputType,
+ String description) {
+ this(
+ ExecNodeContext.newNodeId(),
+ ExecNodeContext.newContext(StreamExecWatermarkAssigner.class),
+
ExecNodeContext.newPersistedConfig(StreamExecWatermarkAssigner.class,
tableConfig),
+ watermarkExpr,
+ rowtimeFieldIndex,
+ Collections.singletonList(inputProperty),
+ outputType,
+ description);
+ }
+
+ @JsonCreator
+ public StreamExecWatermarkAssigner(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
persistedConfig,
+ @JsonProperty(FIELD_NAME_WATERMARK_EXPR) RexNode watermarkExpr,
+ @JsonProperty(FIELD_NAME_ROWTIME_FIELD_INDEX) int
rowtimeFieldIndex,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(id, context, persistedConfig, inputProperties, outputType,
description);
+ checkArgument(inputProperties.size() == 1);
+ this.watermarkExpr = checkNotNull(watermarkExpr);
+ this.rowtimeFieldIndex = rowtimeFieldIndex;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Transformation<RowData> translateToPlanInternal(
+ PlannerBase planner, ExecNodeConfig config) {
+ final ExecEdge inputEdge = getInputEdges().get(0);
+ final Transformation<RowData> inputTransform =
+ (Transformation<RowData>) inputEdge.translateToPlan(planner);
+
+ final GeneratedWatermarkGenerator watermarkGenerator =
+ WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
+ config,
+ planner.getFlinkContext().getClassLoader(),
+ (RowType) inputEdge.getOutputType(),
+ watermarkExpr,
+ JavaScalaConversionUtil.toScala(Optional.empty()));
+
+ final long idleTimeout =
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT).toMillis();
+
+ final WatermarkAssignerOperatorFactory operatorFactory =
+ new WatermarkAssignerOperatorFactory(
+ rowtimeFieldIndex, idleTimeout, watermarkGenerator);
+
+ io.github.zhztheplayer.velox4j.type.RowType inputType =
+ (io.github.zhztheplayer.velox4j.type.RowType)
+
LogicalTypeConverter.toVLType(inputEdge.getOutputType());
+ List<String> inNames =
Utils.getNamesFromRowType(inputEdge.getOutputType());
+
+ System.out.println("Watermark " + watermarkExpr);
+ //TypedExpr watermarkExprs =
RexNodeConverter.toTypedExpr(watermarkExpr, inNames);
+ io.github.zhztheplayer.velox4j.type.RowType outputType =
+ (io.github.zhztheplayer.velox4j.type.RowType)
+ LogicalTypeConverter.toVLType(getOutputType());
+ // PlanNode watermark = new WatermarkNode(watermarkExprs, inputType,
outputType, idleTimeout, rowtimeFieldIndex);
Review Comment:
Please remove this commented line if useless, as well as the above commented
code.
##########
gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java:
##########
@@ -66,30 +72,41 @@ public PlanNode getPlanNode() {
public String getId() { return id; }
+ public ConnectorSplit getConnectorSplit() { return split; }
+
@Override
public void run(SourceContext<RowData> sourceContext) throws Exception {
- final List<BoundSplit> splits = List.of(new BoundSplit(
- id,
- -1,
- new FuzzerConnectorSplit("connector-fuzzer", 1000)));
- session =
Velox4j.newSession(MemoryManager.create(AllocationListener.NOOP));
+ final List<BoundSplit> splits = List.of(new BoundSplit(id, -1, split));
+ memoryManager = MemoryManager.create(AllocationListener.NOOP);
+ session = Velox4j.newSession(memoryManager);
query = new Query(planNode, splits, Config.empty(),
ConnectorConfig.empty());
allocator = new RootAllocator(Long.MAX_VALUE);
+ UpIterator upIterator = session.queryOps().execute(query);
while (isRunning) {
- CloseableIterator<RowVector> result =
-
UpIterators.asJavaIterator(session.queryOps().execute(query));
- if (result.hasNext()) {
+ UpIterator.State state = upIterator.advance();
+ if (state == UpIterator.State.AVAILABLE) {
+ final RowVector outRv = upIterator.get();
List<RowData> rows = FlinkRowToVLVectorConvertor.toRowData(
- result.next(),
+ outRv,
allocator,
- session,
outputType);
for (RowData row : rows) {
sourceContext.collect(row);
}
+ outRv.close();
+ } else if (state == UpIterator.State.BLOCKED) {
+ System.out.println("Get empty row");
Review Comment:
Prefer using LOG.
##########
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+import
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.Type;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.table.runtime.operators.GlutenSingleInputOperator;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
Review Comment:
Please document which part is modified based on Flink's original class. Only
translateToPlanInternal?
##########
gluten-flink/docs/Flink.md:
##########
@@ -105,9 +108,35 @@ bin/sql-client.sh -f data-generator.sql
TODO
## Performance
-Using the data-generator example, it shows that for native execution, it can
generate 10,0000
-records in about 60ms, while Flink generator 10,000 records in about 600ms. It
runs 10 times faster.
-More perf cases to be added.
+We are working on supporting the [Nexmark](https://github.com/nexmark/nexmark)
benchmark for Flink.
+Now the q0 has been supported.
+
+Results show than running with gluten can be 2.x times faster than Flink.
Review Comment:
typo: than -> that
##########
gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java:
##########
@@ -89,61 +90,76 @@ private JobGraph mergeGlutenOperators(JobGraph jobGraph) {
for (JobVertex vertex : jobGraph.getVertices()) {
StreamConfig streamConfig = new
StreamConfig(vertex.getConfiguration());
buildGlutenChains(streamConfig);
+ LOG.debug("Vertex {} is {}.", vertex.getName(), streamConfig);
}
return jobGraph;
}
private void buildGlutenChains(
- StreamConfig taskConfig) {
- // TODO: only support head operator now.
- if (isGlutenOperator(taskConfig)) {
- while (true) {
- List<StreamEdge> outEdges =
taskConfig.getChainedOutputs(userClassloader);
- if (outEdges.size() != 1) {
- // only support operators have one output.
- break;
+ StreamConfig vertexConfig) {
+ Map<Integer, StreamConfig> serializedTasks =
+ vertexConfig.getTransitiveChainedTaskConfigs(userClassloader);
+ Map<Integer, StreamConfig> chainedTasks = new
HashMap<>(serializedTasks.size());
+ serializedTasks.forEach(
+ (id, config) ->
+ chainedTasks.put(id, new
StreamConfig(config.getConfiguration()))
+ );
+ StreamConfig taskConfig = vertexConfig;
Review Comment:
Is this necessary? Why not just using vertexConfig?
##########
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+import
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.Type;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.table.runtime.operators.GlutenSingleInputOperator;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Stream {@link ExecNode} which generates watermark based on the input
elements. */
+@ExecNodeMetadata(
+ name = "stream-exec-watermark-assigner",
+ version = 1,
+ producedTransformations =
StreamExecWatermarkAssigner.WATERMARK_ASSIGNER_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v1_15,
+ minStateVersion = FlinkVersion.v1_15)
+public class StreamExecWatermarkAssigner extends ExecNodeBase<RowData>
+ implements StreamExecNode<RowData>,
SingleTransformationTranslator<RowData> {
+
+ public static final String WATERMARK_ASSIGNER_TRANSFORMATION =
"watermark-assigner";
+
+ public static final String FIELD_NAME_WATERMARK_EXPR = "watermarkExpr";
+ public static final String FIELD_NAME_ROWTIME_FIELD_INDEX =
"rowtimeFieldIndex";
+
+ @JsonProperty(FIELD_NAME_WATERMARK_EXPR)
+ private final RexNode watermarkExpr;
+
+ @JsonProperty(FIELD_NAME_ROWTIME_FIELD_INDEX)
+ private final int rowtimeFieldIndex;
+
+ public StreamExecWatermarkAssigner(
+ ReadableConfig tableConfig,
+ RexNode watermarkExpr,
+ int rowtimeFieldIndex,
+ InputProperty inputProperty,
+ RowType outputType,
+ String description) {
+ this(
+ ExecNodeContext.newNodeId(),
+ ExecNodeContext.newContext(StreamExecWatermarkAssigner.class),
+
ExecNodeContext.newPersistedConfig(StreamExecWatermarkAssigner.class,
tableConfig),
+ watermarkExpr,
+ rowtimeFieldIndex,
+ Collections.singletonList(inputProperty),
+ outputType,
+ description);
+ }
+
+ @JsonCreator
+ public StreamExecWatermarkAssigner(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
persistedConfig,
+ @JsonProperty(FIELD_NAME_WATERMARK_EXPR) RexNode watermarkExpr,
+ @JsonProperty(FIELD_NAME_ROWTIME_FIELD_INDEX) int
rowtimeFieldIndex,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(id, context, persistedConfig, inputProperties, outputType,
description);
+ checkArgument(inputProperties.size() == 1);
+ this.watermarkExpr = checkNotNull(watermarkExpr);
+ this.rowtimeFieldIndex = rowtimeFieldIndex;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Transformation<RowData> translateToPlanInternal(
+ PlannerBase planner, ExecNodeConfig config) {
+ final ExecEdge inputEdge = getInputEdges().get(0);
+ final Transformation<RowData> inputTransform =
+ (Transformation<RowData>) inputEdge.translateToPlan(planner);
+
+ final GeneratedWatermarkGenerator watermarkGenerator =
+ WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
+ config,
+ planner.getFlinkContext().getClassLoader(),
+ (RowType) inputEdge.getOutputType(),
+ watermarkExpr,
+ JavaScalaConversionUtil.toScala(Optional.empty()));
+
+ final long idleTimeout =
+
config.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT).toMillis();
+
+ final WatermarkAssignerOperatorFactory operatorFactory =
+ new WatermarkAssignerOperatorFactory(
+ rowtimeFieldIndex, idleTimeout, watermarkGenerator);
+
+ io.github.zhztheplayer.velox4j.type.RowType inputType =
+ (io.github.zhztheplayer.velox4j.type.RowType)
+
LogicalTypeConverter.toVLType(inputEdge.getOutputType());
+ List<String> inNames =
Utils.getNamesFromRowType(inputEdge.getOutputType());
+
+ System.out.println("Watermark " + watermarkExpr);
Review Comment:
Remove it or replace it by LOG.
##########
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ParallelismProvider;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import
org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider;
+import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
+import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec;
+import org.apache.flink.table.planner.plan.abilities.sink.RowLevelUpdateSpec;
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
+import
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import
org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer;
+import org.apache.flink.table.runtime.operators.sink.RowKindSetter;
+import org.apache.flink.table.runtime.operators.sink.SinkOperator;
+import
org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.gluten.table.runtime.operators.GlutenSingleInputOperator;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
Review Comment:
ditto to document the modified part.
##########
gluten-flink/runtime/src/main/java/org/apache/flink/client/StreamGraphTranslator.java:
##########
@@ -89,61 +90,76 @@ private JobGraph mergeGlutenOperators(JobGraph jobGraph) {
for (JobVertex vertex : jobGraph.getVertices()) {
StreamConfig streamConfig = new
StreamConfig(vertex.getConfiguration());
buildGlutenChains(streamConfig);
+ LOG.debug("Vertex {} is {}.", vertex.getName(), streamConfig);
}
return jobGraph;
}
private void buildGlutenChains(
Review Comment:
Please add some comments for basic description.
##########
gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.translators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.TransformationTranslator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.type.RowType;
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.operators.GlutenSourceFunction;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
Review Comment:
Please also document to clarify which part is revised based on the original
flink class.
##########
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+import
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.Type;
+import org.apache.calcite.rex.RexNode;
+import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.table.runtime.operators.GlutenSingleInputOperator;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
Review Comment:
We expect this class is loaded prior to Flink's original class to go into
the overwritten code logic. Is this always guaranteed? In Gluten Spark
document, there is a guide to configure extra class path to make sure class
loading order. See velox-backend-troubleshooting.md. I am wondering whether
this problem possibly happens in Flink.
##########
gluten-flink/runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java:
##########
@@ -179,30 +180,32 @@ private void expand() {
if (sink instanceof DiscardingSink) {
RowType outputType = (RowType)
LogicalTypeConverter.toVLType(
((InternalTypeInfo)
transformation.getOutputType()).toLogicalType());
- RowType outType = new RowType(List.of("num"), List.of(new
BigIntType()));
- String plan = Serde.toJson(new TableWriteNode(
- String.valueOf(transformation.getId()),
+ // TODO: this is a constrain of velox
+ RowType ignore = new RowType(List.of("num"), List.of(new
BigIntType()));
Review Comment:
What does "ignore" actually mean here? Please add a comment or use other
name for better readability.
##########
gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java:
##########
@@ -66,30 +72,41 @@ public PlanNode getPlanNode() {
public String getId() { return id; }
+ public ConnectorSplit getConnectorSplit() { return split; }
+
@Override
public void run(SourceContext<RowData> sourceContext) throws Exception {
- final List<BoundSplit> splits = List.of(new BoundSplit(
- id,
- -1,
- new FuzzerConnectorSplit("connector-fuzzer", 1000)));
- session =
Velox4j.newSession(MemoryManager.create(AllocationListener.NOOP));
+ final List<BoundSplit> splits = List.of(new BoundSplit(id, -1, split));
+ memoryManager = MemoryManager.create(AllocationListener.NOOP);
+ session = Velox4j.newSession(memoryManager);
query = new Query(planNode, splits, Config.empty(),
ConnectorConfig.empty());
allocator = new RootAllocator(Long.MAX_VALUE);
+ UpIterator upIterator = session.queryOps().execute(query);
while (isRunning) {
- CloseableIterator<RowVector> result =
-
UpIterators.asJavaIterator(session.queryOps().execute(query));
- if (result.hasNext()) {
+ UpIterator.State state = upIterator.advance();
+ if (state == UpIterator.State.AVAILABLE) {
+ final RowVector outRv = upIterator.get();
List<RowData> rows = FlinkRowToVLVectorConvertor.toRowData(
- result.next(),
+ outRv,
allocator,
- session,
outputType);
for (RowData row : rows) {
sourceContext.collect(row);
}
+ outRv.close();
+ } else if (state == UpIterator.State.BLOCKED) {
+ System.out.println("Get empty row");
+ } else {
+ System.out.println("Velox task finished");
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]