This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 05debdecc3 [GLUTEN-11588][FLINK] Support rocksdb state for window 
operator (#11589)
05debdecc3 is described below

commit 05debdecc3027d0ff5b3504b4468896290da0b21
Author: kevinyhzou <[email protected]>
AuthorDate: Fri Mar 6 16:48:16 2026 +0800

    [GLUTEN-11588][FLINK] Support rocksdb state for window operator (#11589)
---
 .github/workflows/flink.yml                        |   2 +-
 .github/workflows/util/install-flink-resources.sh  |  32 ++++-
 gluten-flink/docs/Flink.md                         |  26 +++-
 .../exec/stream/StreamExecWindowAggregate.java     |  38 +++--
 gluten-flink/runtime/pom.xml                       |   6 +
 .../gluten/client/OffloadedJobGraphGenerator.java  |   9 +-
 .../runtime/operators/GlutenOneInputOperator.java  |  17 ++-
 .../runtime/operators/GlutenSessionResource.java   |  11 ++
 .../runtime/operators/GlutenSourceFunction.java    |   2 +-
 .../runtime/operators/GlutenTwoInputOperator.java  |   2 +-
 .../table/runtime/operators/WindowAggOperator.java | 158 +++++++++++++++++++++
 11 files changed, 277 insertions(+), 26 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index 3e893afce0..5bd953c77c 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -62,7 +62,7 @@ jobs:
           sudo yum install 
https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm
 -y
           sudo .github/workflows/util/install-flink-resources.sh
           git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
-          cd velox4j && git reset --hard 
288d181a1b05c47f1f17339eb498dd6375f7aec8
+          cd velox4j && git reset --hard 
889bafcf2fa04e8c31a30edbdf40fe203ef58484
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip 
-Dspotless.skip=true
           cd ..
diff --git a/.github/workflows/util/install-flink-resources.sh 
b/.github/workflows/util/install-flink-resources.sh
index 23c638cef8..600192a9a1 100755
--- a/.github/workflows/util/install-flink-resources.sh
+++ b/.github/workflows/util/install-flink-resources.sh
@@ -16,6 +16,8 @@
 
 LIBRDKAFKA_VERSION="v2.10.0"
 CPPKAFKA_VERSION="v0.4.1"
+FROCKSDB_VERSION="FRocksDB-6.20.3"
+FROCKSDB_REPO="ververica/frocksdb"
 
 function wget_and_untar {
   local URL=$1
@@ -39,6 +41,28 @@ function wget_and_untar {
   popd
 }
 
+function github_checkout {
+  local REPO=$1
+  shift
+  local VERSION=$1
+  shift
+  local GIT_CLONE_PARAMS=("$@")
+  local DIRNAME
+  DIRNAME=$(basename "$REPO")
+  SUDO="${SUDO:-""}"
+  cd "${DEPENDENCY_DIR}" || exit
+  if [ -z "${DIRNAME}" ]; then
+    echo "Failed to get repo name from ${REPO}"
+    exit 1
+  fi
+  if [ -d "${DIRNAME}" ] && prompt "${DIRNAME} already exists. Delete?"; then
+    ${SUDO} rm -rf "${DIRNAME}"
+  fi
+  if [ ! -d "${DIRNAME}" ]; then
+    git clone -q -b "$VERSION" "${GIT_CLONE_PARAMS[@]}" 
"https://github.com/${REPO}.git";
+  fi
+}
+
 function cmake_install_dir {
   pushd "./${DEPENDENCY_DIR}/$1"
   # remove the directory argument
@@ -60,7 +84,7 @@ function cmake_install {
   fi
 
   mkdir -p "${BINARY_DIR}"
-  COMPILER_FLAGS=$(get_cxx_flags)
+  COMPILER_FLAGS="-g -gdwarf-2"
   # Add platform specific CXX flags if any
   COMPILER_FLAGS+=${OS_CXXFLAGS}
 
@@ -93,9 +117,15 @@ function install_cppkafka {
   cmake_install_dir cppkafka -DBUILD_TESTS=OFF
 }
 
+function install_rocksdb {
+  github_checkout ${FROCKSDB_REPO} ${FROCKSDB_VERSION}
+  cmake_install_dir frocksdb -DWITH_GFLAGS=OFF -DWITH_TESTS=OFF 
-DFAIL_ON_WARNINGS=OFF
+}
+
 function install_velox_deps {
   run_and_time install_librdkafka
   run_and_time install_cppkafka
+  run_and_time install_rocksdb
 }
 
 install_velox_deps
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index f84aa9945e..572df629c1 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you 
have to use the follow
 ## fetch velox4j code
 git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
 cd velox4j
-git reset --hard 288d181a1b05c47f1f17339eb498dd6375f7aec8
+git reset --hard 889bafcf2fa04e8c31a30edbdf40fe203ef58484
 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
 ```
 **Get gluten**
@@ -140,6 +140,30 @@ bin/sql-client.sh -f data-generator.sql
 
 TODO
 
+### RocksDB State
+
+**Get & compile RocksDB**
+```bash
+git clone -b FRocksDB-6.20.3 https://github.com/ververica/frocksdb.git
+cd frocksdb
+make rocksdbjava -i
+```
+
+**Config RocksDB backend**
+- copy compiled jar package to `${FLINK_HOME}/gluten_lib` directory.
+    ```bash
+    cp ${ROCKSDB_COMPILE_DIR}/java/target/rocksdbjni-6.20.3-linux64.jar 
${FLINK_HOME}/gluten_lib
+    ```
+- modify `${FLINK_HOME}/bin/config.sh` as follows
+    ```
+    
GLUTEN_JAR="$FLINK_HOME/gluten_lib/gluten-flink-loader-1.6.0.jar:$FLINK_HOME/gluten_lib/velox4j-0.1.0-SNAPSHOT.jar:$FLINK_HOME/gluten_lib/gluten-flink-runtime-1.6.0.jar:$FLINK_HOME/gluten_lib/rocksdbjni-6.20.3-linux64.jar"
+    echo "$GLUTEN_JAR""$FLINK_CLASSPATH""$FLINK_DIST"
+    ```
+- set rocksdb config in `${FLINK_HOME}/conf/config.yaml`
+    ```
+    state.backend.type: rocksdb
+    ```
+
 ## Performance
 We are working on supporting the [Nexmark](https://github.com/nexmark/nexmark) 
benchmark for Flink.
 Now the q0 has been supported.
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
index 499a65516d..93e0703986 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream;
 import org.apache.gluten.rexnode.AggregateCallConverter;
 import org.apache.gluten.rexnode.Utils;
 import org.apache.gluten.rexnode.WindowUtils;
-import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
 import org.apache.gluten.util.LogicalTypeConverter;
 import org.apache.gluten.util.PlanNodeIdGenerator;
 
@@ -52,12 +51,16 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.TableConfigUtils;
 import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -176,6 +179,14 @@ public class StreamExecWindowAggregate extends 
StreamExecWindowAggregateBase {
     final ZoneId shiftTimeZone =
         TimeWindowUtil.getShiftTimeZone(
             windowing.getTimeAttributeType(), 
TableConfigUtils.getLocalTimeZone(config));
+    final AggregateInfoList aggInfoList =
+        AggregateUtil.deriveStreamWindowAggregateInfoList(
+            planner.getTypeFactory(),
+            inputRowType,
+            JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+            needRetraction,
+            windowing.getWindow(),
+            true); // isStateBackendDataViews
 
     // --- Begin Gluten-specific code changes ---
     // TODO: velox window not equal to flink window.
@@ -245,23 +256,30 @@ public class StreamExecWindowAggregate extends 
StreamExecWindowAggregateBase {
             windowType,
             outputType,
             rowtimeIndex);
-    final OneInputStreamOperator windowOperator =
-        new GlutenOneInputOperator(
+    final RowDataKeySelector selector =
+        KeySelectorUtil.getRowDataSelector(
+            planner.getFlinkContext().getClassLoader(),
+            grouping,
+            InternalTypeInfo.of(inputRowType));
+    LogicalType[] accTypes =
+        Arrays.stream(aggInfoList.getAccTypes())
+            .map(x -> x.getLogicalType())
+            .collect(Collectors.toList())
+            .toArray(new LogicalType[] {});
+    final OneInputStreamOperator<RowData, RowData> windowOperator =
+        new 
org.apache.gluten.table.runtime.operators.WindowAggOperator<RowData, RowData, 
Long>(
             new StatefulPlanNode(windowAgg.getId(), windowAgg),
             PlanNodeIdGenerator.newId(),
             inputType,
             Map.of(windowAgg.getId(), outputType),
             RowData.class,
             RowData.class,
-            "StreamExecWindowAggregate");
+            "StreamExecWindowAggregate",
+            selector.getProducedType(),
+            aggInfoList.getAggNames(),
+            accTypes);
     // --- End Gluten-specific code changes ---
 
-    final RowDataKeySelector selector =
-        KeySelectorUtil.getRowDataSelector(
-            planner.getFlinkContext().getClassLoader(),
-            grouping,
-            InternalTypeInfo.of(inputRowType));
-
     final OneInputTransformation<RowData, RowData> transform =
         ExecNodeUtil.createOneInputTransformation(
             inputTransform,
diff --git a/gluten-flink/runtime/pom.xml b/gluten-flink/runtime/pom.xml
index cfb58166bb..fc3d272d55 100644
--- a/gluten-flink/runtime/pom.xml
+++ b/gluten-flink/runtime/pom.xml
@@ -63,6 +63,12 @@
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-statebackend-rocksdb</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java
index 622777eebe..42784fce28 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/client/OffloadedJobGraphGenerator.java
@@ -240,14 +240,7 @@ public class OffloadedJobGraphGenerator {
     Class<?> inClass = supportsVectorInput ? StatefulRecord.class : 
RowData.class;
     Class<?> outClass = supportsVectorOutput ? StatefulRecord.class : 
RowData.class;
     GlutenOneInputOperator<?, ?> newOneInputOp =
-        new GlutenOneInputOperator<>(
-            planNode,
-            sourceOperator.getId(),
-            sourceOperator.getInputType(),
-            sourceOperator.getOutputTypes(),
-            inClass,
-            outClass,
-            sourceOperator.getDescription());
+        sourceOperator.cloneWithInputOutputClasses(inClass, outClass);
     offloadedOpConfig.setStreamOperator(newOneInputOp);
     if (supportsVectorOutput) {
       setOffloadedOutputSerializer(offloadedOpConfig, sourceOperator);
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
index df0b8b921f..a2733c2438 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
@@ -65,7 +65,7 @@ public class GlutenOneInputOperator<IN, OUT> extends 
TableStreamOperator<OUT>
   private transient GlutenSessionResource sessionResource;
   private transient Query query;
   private transient ExternalStreams.BlockingQueue inputQueue;
-  private transient SerialTask task;
+  protected transient SerialTask task;
   private final Class<IN> inClass;
   private final Class<OUT> outClass;
   private transient VectorInputBridge<IN> inputBridge;
@@ -191,6 +191,18 @@ public class GlutenOneInputOperator<IN, OUT> extends 
TableStreamOperator<OUT>
     }
   }
 
+  public <NIN, NOUT> GlutenOneInputOperator<NIN, NOUT> 
cloneWithInputOutputClasses(
+      Class<NIN> newInClass, Class<NOUT> newOutClass) {
+    return new GlutenOneInputOperator<>(
+        this.glutenPlan,
+        this.id,
+        this.inputType,
+        this.outputTypes,
+        newInClass,
+        newOutClass,
+        this.description);
+  }
+
   @Override
   public void processWatermark(Watermark mark) throws Exception {
     task.notifyWatermark(mark.getTimestamp());
@@ -260,8 +272,7 @@ public class GlutenOneInputOperator<IN, OUT> extends 
TableStreamOperator<OUT>
     if (task == null) {
       initSession();
     }
-    // TODO: implement it
-    task.initializeState(0);
+    task.initializeState(0, null);
     super.initializeState(context);
   }
 
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResource.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResource.java
index b54102c466..ea38229e95 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResource.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSessionResource.java
@@ -21,6 +21,8 @@ import 
io.github.zhztheplayer.velox4j.memory.AllocationListener;
 import io.github.zhztheplayer.velox4j.memory.MemoryManager;
 import io.github.zhztheplayer.velox4j.session.Session;
 
+import org.apache.flink.runtime.state.KeyedStateBackend;
+
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 
@@ -29,6 +31,7 @@ class GlutenSessionResource {
   private Session session;
   private MemoryManager memoryManager;
   private BufferAllocator allocator;
+  private KeyedStateBackend<?> keyedStateBackend;
 
   public GlutenSessionResource() {
     this.memoryManager = MemoryManager.create(AllocationListener.NOOP);
@@ -62,4 +65,12 @@ class GlutenSessionResource {
   public BufferAllocator getAllocator() {
     return allocator;
   }
+
+  public KeyedStateBackend<?> getKeyedStateBackend() {
+    return keyedStateBackend;
+  }
+
+  public void setKeyedStateBackend(KeyedStateBackend<?> keyedStateBackend) {
+    this.keyedStateBackend = keyedStateBackend;
+  }
 }
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
index 5c7b0c874d..ea0ddcbc7c 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
@@ -215,7 +215,7 @@ public class GlutenSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
   public void initializeState(FunctionInitializationContext context) throws 
Exception {
     initSession();
     // TODO: implement it
-    this.task.initializeState(0);
+    this.task.initializeState(0, null);
   }
 
   public String[] notifyCheckpointComplete(long checkpointId) throws Exception 
{
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
index 6d4765af97..2352d74943 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java
@@ -253,7 +253,7 @@ public class GlutenTwoInputOperator<IN, OUT> extends 
AbstractStreamOperator<OUT>
   public void initializeState(StateInitializationContext context) throws 
Exception {
     initSession();
     // TODO: implement it
-    task.initializeState(0);
+    task.initializeState(0, null);
     super.initializeState(context);
   }
 
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/WindowAggOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/WindowAggOperator.java
new file mode 100644
index 0000000000..50d91f2c73
--- /dev/null
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/WindowAggOperator.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.table.runtime.operators;
+
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import 
io.github.zhztheplayer.velox4j.stateful.RocksDBKeyedStateBackendParameters;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.operators.window.tvf.state.WindowValueState;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class WindowAggOperator<IN, OUT, W> extends GlutenOneInputOperator<IN, 
OUT> {
+  private final String windowStateName = "window-aggs";
+  private WindowValueState<W> windowState;
+  private InternalTypeInfo<RowData> keyType;
+  private String[] accNames;
+  private LogicalType[] accTypes;
+
+  public WindowAggOperator(
+      StatefulPlanNode plan,
+      String id,
+      RowType inputType,
+      Map<String, RowType> outputTypes,
+      Class<IN> inClass,
+      Class<OUT> outClass,
+      String description,
+      InternalTypeInfo<RowData> keyType,
+      String[] accNames,
+      LogicalType[] accTypes) {
+    super(plan, id, inputType, outputTypes, inClass, outClass, description);
+    this.keyType = keyType;
+    this.accNames = accNames;
+    this.accTypes = accTypes;
+  }
+
+  public InternalTypeInfo<RowData> getKeyTye() {
+    return keyType;
+  }
+
+  public String[] getAggregateNames() {
+    return accNames;
+  }
+
+  public LogicalType[] getAggregateTypes() {
+    return accTypes;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+    KeyedStateBackend<?> stateBackend = getKeyedStateBackend();
+    ValueStateDescriptor<RowData> descriptor =
+        new ValueStateDescriptor<>(windowStateName, new 
RowDataSerializer(accTypes));
+    ValueState<RowData> state =
+        stateBackend.getOrCreateKeyedState(LongSerializer.INSTANCE, 
descriptor);
+    this.windowState = new WindowValueState<>((InternalValueState<RowData, W, 
RowData>) state);
+    if (stateBackend instanceof RocksDBKeyedStateBackend) {
+      RocksDBKeyedStateBackend<RowData> keyedStateBackend =
+          (RocksDBKeyedStateBackend<RowData>) stateBackend;
+      RocksDB dbInstance =
+          (RocksDB)
+              ReflectUtils.getObjectField(RocksDBKeyedStateBackend.class, 
keyedStateBackend, "db");
+      ColumnFamilyHandle columnFamilyHandle =
+          (ColumnFamilyHandle)
+              ReflectUtils.invokeObjectMethod(
+                  RocksDBKeyedStateBackend.class,
+                  keyedStateBackend,
+                  "getColumnFamilyHandle",
+                  new Class<?>[] {String.class},
+                  new Object[] {windowStateName});
+      String jobId = getRuntimeContext().getJobInfo().getJobId().toString();
+      String operartorId = 
getRuntimeContext().getOperatorUniqueID().toString();
+      List<RowField> accFields = new ArrayList<>();
+      for (int i = 0; i < accNames.length; ++i) {
+        accFields.add(new RowField(accNames[i], accTypes[i]));
+      }
+      RocksDBKeyedStateBackendParameters parameters =
+          new RocksDBKeyedStateBackendParameters(
+              jobId,
+              operartorId,
+              1,
+              dbInstance.getNativeHandle(),
+              keyedStateBackend.getReadOptions().getNativeHandle(),
+              keyedStateBackend.getWriteOptions().getNativeHandle(),
+              List.of(windowStateName),
+              Map.of(windowStateName, operartorId),
+              Map.of(windowStateName, columnFamilyHandle.getNativeHandle()),
+              Map.of(windowStateName, 
LogicalTypeConverter.toVLType(keyType.toLogicalType())),
+              Map.of(
+                  windowStateName,
+                  LogicalTypeConverter.toVLType(
+                      new 
org.apache.flink.table.types.logical.RowType(accFields))),
+              Map.of(windowStateName, new BigIntType()));
+      task.initializeState(0, parameters);
+    }
+  }
+
+  @Override
+  public <NIN, NOUT> WindowAggOperator<NIN, NOUT, W> 
cloneWithInputOutputClasses(
+      Class<NIN> newInClass, Class<NOUT> newOutClass) {
+    return new WindowAggOperator<>(
+        getPlanNode(),
+        getId(),
+        getInputType(),
+        getOutputTypes(),
+        newInClass,
+        newOutClass,
+        getDescription(),
+        keyType,
+        accNames,
+        accTypes);
+  }
+
+  @Override
+  public void setCurrentKey(Object key) {}
+
+  public void close() throws Exception {
+    super.close();
+    if (windowState != null) {}
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to