This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 440cf90d [AURON #2062] Fix AuronKafkaSourceFunction watermark was not 
generated (#2098)
440cf90d is described below

commit 440cf90db79b9f72fbf63d1a1884705bfbed574d
Author: zhangmang <[email protected]>
AuthorDate: Thu Mar 19 11:04:13 2026 +0800

    [AURON #2062] Fix AuronKafkaSourceFunction watermark was not generated 
(#2098)
    
    # Which issue does this PR close?
    
    Closes #2062
    
    # Rationale for this change
    * use `org.apache.flink.table.runtime.generated.WatermarkGenerator` to
    calculate the watermark
    
    # What changes are included in this PR?
    * add `flink-table-runtime` dependency
    * modify `AuronKafkaSourceFunction` to use
    `org.apache.flink.table.runtime.generated.WatermarkGenerator`
    
    # Are there any user-facing changes?
    * No
    
    # How was this patch tested?
    * JNI and Spark need to be decoupled
---
 auron-flink-extension/auron-flink-runtime/pom.xml  |   6 +
 .../kafka/AuronKafkaDynamicTableSource.java        |   2 +-
 .../connector/kafka/AuronKafkaSourceFunction.java  | 238 +++++++++------------
 .../SourceContextWatermarkOutputAdapter.java       |  48 -----
 4 files changed, 107 insertions(+), 187 deletions(-)

diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml 
b/auron-flink-extension/auron-flink-runtime/pom.xml
index 654ce826..d99a2dbf 100644
--- a/auron-flink-extension/auron-flink-runtime/pom.xml
+++ b/auron-flink-extension/auron-flink-runtime/pom.xml
@@ -74,6 +74,12 @@
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-runtime</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
 
     <!-- Kafka client for partition metadata discovery -->
     <dependency>
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
index 837ea581..4b974d29 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -88,7 +88,7 @@ public class AuronKafkaDynamicTableSource implements 
ScanTableSource, SupportsWa
                 startupMode);
 
         if (watermarkStrategy != null) {
-            sourceFunction.assignTimestampsAndWatermarks(watermarkStrategy);
+            sourceFunction.setWatermarkStrategy(watermarkStrategy);
         }
 
         return new DataStreamScanProvider() {
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index 52c56fa2..16d16da9 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -17,10 +17,10 @@
 package org.apache.auron.flink.connector.kafka;
 
 import static org.apache.auron.flink.connector.kafka.KafkaConstants.*;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import java.io.File;
 import java.io.InputStream;
+import java.lang.reflect.Field;
 import java.util.*;
 import org.apache.auron.flink.arrow.FlinkArrowReader;
 import org.apache.auron.flink.arrow.FlinkArrowUtils;
@@ -38,11 +38,6 @@ import org.apache.auron.protobuf.KafkaStartupMode;
 import org.apache.auron.protobuf.PhysicalPlanNode;
 import org.apache.commons.collections.map.LinkedMap;
 import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.eventtime.TimestampAssigner;
-import org.apache.flink.api.common.eventtime.WatermarkGenerator;
-import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
@@ -50,7 +45,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
@@ -60,16 +54,17 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
-import 
org.apache.flink.streaming.connectors.kafka.internals.SourceContextWatermarkOutputAdapter;
 import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier;
+import org.apache.flink.table.runtime.generated.WatermarkGenerator;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.SerializableObject;
-import org.apache.flink.util.SerializedValue;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
@@ -82,9 +77,9 @@ import org.slf4j.LoggerFactory;
  * If checkpoints are enabled, Kafka offsets are committed via Auron after a 
successful checkpoint.
  * If checkpoints are disabled, Kafka offsets are committed periodically via 
Auron.
  *
- * <p>Watermark support is implemented via {@link WatermarkOutputMultiplexer} 
with per-partition
- * watermark generation. Partition expansion is detected periodically using a 
lightweight
- * {@link KafkaConsumer} (metadata queries only, no data consumption).
+ * <p>Watermark support uses the table-runtime {@link WatermarkGenerator} 
directly
+ * (from {@code WatermarkPushDownSpec}) with per-partition watermark tracking.
+ * The combined watermark emitted downstream is the minimum across all 
assigned partitions.
  */
 public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData>
         implements FlinkAuronFunction, CheckpointListener, 
CheckpointedFunction {
@@ -110,7 +105,6 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     private transient Map<Integer, Long> restoredOffsets;
     private transient Map<Integer, Long> currentOffsets;
     private final SerializableObject lock = new SerializableObject();
-    private SerializedValue<WatermarkStrategy<RowData>> watermarkStrategy;
     private volatile boolean isRunning;
     private transient String auronOperatorIdWithSubtaskIndex;
     private transient MetricNode nativeMetric;
@@ -120,14 +114,11 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
     private transient KafkaConsumer<byte[], byte[]> kafkaConsumer;
     private transient List<Integer> assignedPartitions;
 
-    // Watermark related
-    private transient WatermarkOutputMultiplexer watermarkOutputMultiplexer;
-    private transient Map<Integer, String> partitionIdToOutputIdMap;
-    private transient WatermarkGenerator<RowData> watermarkGenerator;
-    private transient TimestampAssigner<RowData> timestampAssigner;
-    // Periodic watermark control: autoWatermarkInterval > 0 means enabled
-    private transient long autoWatermarkInterval;
-    private transient long lastPeriodicWatermarkTime;
+    // Watermark related: uses table-runtime WatermarkGenerator directly
+    private WatermarkStrategy<RowData> watermarkStrategy;
+    private transient WatermarkGenerator tableWatermarkGenerator;
+    private transient Map<Integer, Long> partitionWatermarks;
+    private transient long currentCombinedWatermark;
 
     public AuronKafkaSourceFunction(
             LogicalType outputType,
@@ -231,22 +222,24 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
                 subtaskIndex,
                 assignedPartitions);
 
-        // 3. Initialize Watermark components if watermarkStrategy is set
+        // 3. Initialize table-runtime WatermarkGenerator if watermarkStrategy 
is set
         if (watermarkStrategy != null) {
-            ClassLoader userCodeClassLoader = 
runtimeContext.getUserCodeClassLoader();
-            WatermarkStrategy<RowData> deserializedWatermarkStrategy =
-                    watermarkStrategy.deserializeValue(userCodeClassLoader);
-
             MetricGroup metricGroup = runtimeContext.getMetricGroup();
-
-            this.timestampAssigner = 
deserializedWatermarkStrategy.createTimestampAssigner(() -> metricGroup);
-
-            this.watermarkGenerator = 
deserializedWatermarkStrategy.createWatermarkGenerator(() -> metricGroup);
-
-            // 4. Determine periodic watermark interval
-            // autoWatermarkInterval > 0 means periodic watermark is enabled
-            this.autoWatermarkInterval = 
runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
-            this.lastPeriodicWatermarkTime = 0L; // Initialize to 0 so first 
emit triggers immediately
+            // Create DataStream API WatermarkGenerator via the strategy
+            org.apache.flink.api.common.eventtime.WatermarkGenerator<RowData> 
dsGenerator =
+                    watermarkStrategy.createWatermarkGenerator(() -> 
metricGroup);
+            // Extract inner table-runtime WatermarkGenerator from 
DefaultWatermarkGenerator
+            if (dsGenerator instanceof 
GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator) {
+                Field field = 
GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator.class.getDeclaredField(
+                        "innerWatermarkGenerator");
+                field.setAccessible(true);
+                this.tableWatermarkGenerator = (WatermarkGenerator) 
field.get(dsGenerator);
+            } else {
+                throw new IllegalStateException("Expected 
DefaultWatermarkGenerator from WatermarkPushDownSpec, got: "
+                        + dsGenerator.getClass().getName());
+            }
+            this.partitionWatermarks = new HashMap<>();
+            this.currentCombinedWatermark = Long.MIN_VALUE;
         }
         this.isRunning = true;
     }
@@ -267,97 +260,76 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
         fieldList.addAll(((RowType) outputType).getFields());
         RowType auronOutputRowType = new RowType(fieldList);
 
-        // Initialize WatermarkOutputMultiplexer here because sourceContext is 
available
-        if (watermarkGenerator != null) {
-            this.watermarkOutputMultiplexer =
-                    new WatermarkOutputMultiplexer(new 
SourceContextWatermarkOutputAdapter<>(sourceContext));
-            this.partitionIdToOutputIdMap = new HashMap<>();
-            for (Integer partition : assignedPartitions) {
-                String outputId = createOutputId(partition);
-                partitionIdToOutputIdMap.put(partition, outputId);
-                watermarkOutputMultiplexer.registerNewOutput(outputId, 
watermark -> {});
-            }
-        }
-
         // Pre-check watermark flag to avoid per-record null checks in the hot 
path
-        final boolean enableWatermark = watermarkGenerator != null;
-
-        while (this.isRunning) {
-            AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
-                    FlinkArrowUtils.getRootAllocator(),
-                    physicalPlanNode,
-                    nativeMetric,
-                    0,
-                    0,
-                    0,
-                    AuronAdaptor.getInstance()
-                            .getAuronConfiguration()
-                            
.getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
-
-            if (enableWatermark) {
-                // Watermark-enabled path
-                while (wrapper.loadNextBatch(batch -> {
-                    Map<Integer, Long> tmpOffsets = new 
HashMap<>(currentOffsets);
-                    FlinkArrowReader arrowReader = 
FlinkArrowReader.create(batch, auronOutputRowType, 3);
-
-                    for (int i = 0; i < batch.getRowCount(); i++) {
-                        AuronColumnarRowData tmpRowData = 
(AuronColumnarRowData) arrowReader.read(i);
-                        // Extract kafka meta fields
-                        int partitionId = tmpRowData.getInt(-3);
-                        long offset = tmpRowData.getLong(-2);
-                        long kafkaTimestamp = tmpRowData.getLong(-1);
-                        tmpOffsets.put(partitionId, offset);
-
-                        // Extract event timestamp via user-defined 
TimestampAssigner
-                        long timestamp = 
timestampAssigner.extractTimestamp(tmpRowData, kafkaTimestamp);
-
-                        // Route to the per-partition WatermarkOutput and 
trigger onEvent
-                        // outputId must not null, else is a bug
-                        String outputId = 
partitionIdToOutputIdMap.get(partitionId);
-                        WatermarkOutput partitionOutput = 
watermarkOutputMultiplexer.getImmediateOutput(outputId);
-                        watermarkGenerator.onEvent(tmpRowData, timestamp, 
partitionOutput);
-                        // Emit record with event timestamp
-                        
sourceContext.collectWithTimestamp(arrowReader.read(i), timestamp);
-                    }
-
-                    // Periodic watermark: only emit if enough time has 
elapsed since last emit
-                    // Controlled by ExecutionConfig.getAutoWatermarkInterval()
-                    long currentTime = System.currentTimeMillis();
-                    if (autoWatermarkInterval > 0
-                            && (currentTime - lastPeriodicWatermarkTime) >= 
autoWatermarkInterval) {
-                        for (Map.Entry<Integer, String> entry : 
partitionIdToOutputIdMap.entrySet()) {
-                            // Use getDeferredOutput for periodic emit: all 
partitions update first,
-                            // then multiplexer merges and emits once via 
onPeriodicEmit()
-                            WatermarkOutput output = 
watermarkOutputMultiplexer.getDeferredOutput(entry.getValue());
-                            watermarkGenerator.onPeriodicEmit(output);
+        final boolean enableWatermark = tableWatermarkGenerator != null;
+
+        AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
+                FlinkArrowUtils.getRootAllocator(),
+                physicalPlanNode,
+                nativeMetric,
+                0,
+                0,
+                0,
+                
AuronAdaptor.getInstance().getAuronConfiguration().getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
+
+        if (enableWatermark) {
+            // Watermark-enabled path: use table-runtime WatermarkGenerator 
directly
+            while (wrapper.loadNextBatch(batch -> {
+                Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
+                FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, 
auronOutputRowType, 3);
+                for (int i = 0; i < batch.getRowCount(); i++) {
+                    AuronColumnarRowData tmpRowData = (AuronColumnarRowData) 
arrowReader.read(i);
+                    // Extract kafka meta fields
+                    int partitionId = tmpRowData.getInt(-3);
+                    long offset = tmpRowData.getLong(-2);
+                    long kafkaTimestamp = tmpRowData.getLong(-1);
+                    tmpOffsets.put(partitionId, offset);
+
+                    try {
+                        // Compute watermark using table-runtime 
WatermarkGenerator (stateless pure function)
+                        // with local Timezone
+                        Long watermark = 
tableWatermarkGenerator.currentWatermark(tmpRowData);
+                        // Update per-partition watermark tracking
+                        if (watermark != null) {
+                            partitionWatermarks.merge(partitionId, watermark, 
Math::max);
                         }
-                        // Merge all deferred updates and emit the combined 
watermark downstream
-                        watermarkOutputMultiplexer.onPeriodicEmit();
-                        lastPeriodicWatermarkTime = currentTime;
+                    } catch (Exception e) {
+                        throw new RuntimeException("Generated 
WatermarkGenerator fails to generate:", e);
                     }
+                    // Emit record with kafka timestamp
+                    sourceContext.collectWithTimestamp(tmpRowData, 
kafkaTimestamp);
+                }
 
-                    synchronized (lock) {
-                        currentOffsets = tmpOffsets;
-                    }
-                })) {}
-            } else {
-                // No-watermark path: still use collectWithTimestamp with 
kafka timestamp
-                while (wrapper.loadNextBatch(batch -> {
-                    Map<Integer, Long> tmpOffsets = new 
HashMap<>(currentOffsets);
-                    FlinkArrowReader arrowReader = 
FlinkArrowReader.create(batch, auronOutputRowType, 3);
-                    for (int i = 0; i < batch.getRowCount(); i++) {
-                        AuronColumnarRowData tmpRowData = 
(AuronColumnarRowData) arrowReader.read(i);
-                        int partitionId = tmpRowData.getInt(-3);
-                        long offset = tmpRowData.getLong(-2);
-                        long kafkaTimestamp = tmpRowData.getLong(-1);
-                        tmpOffsets.put(partitionId, offset);
-                        
sourceContext.collectWithTimestamp(arrowReader.read(i), kafkaTimestamp);
-                    }
-                    synchronized (lock) {
-                        currentOffsets = tmpOffsets;
+                // After each batch, compute combined watermark (min across 
all partitions) and emit
+                if (!partitionWatermarks.isEmpty()) {
+                    long minWatermark = 
Collections.min(partitionWatermarks.values());
+                    if (minWatermark > currentCombinedWatermark) {
+                        currentCombinedWatermark = minWatermark;
+                        sourceContext.emitWatermark(new 
Watermark(minWatermark));
                     }
-                })) {}
-            }
+                }
+
+                synchronized (lock) {
+                    currentOffsets = tmpOffsets;
+                }
+            })) {}
+        } else {
+            // No-watermark path: still use collectWithTimestamp with kafka 
timestamp
+            while (wrapper.loadNextBatch(batch -> {
+                Map<Integer, Long> tmpOffsets = new HashMap<>(currentOffsets);
+                FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, 
auronOutputRowType, 3);
+                for (int i = 0; i < batch.getRowCount(); i++) {
+                    AuronColumnarRowData tmpRowData = (AuronColumnarRowData) 
arrowReader.read(i);
+                    int partitionId = tmpRowData.getInt(-3);
+                    long offset = tmpRowData.getLong(-2);
+                    long kafkaTimestamp = tmpRowData.getLong(-1);
+                    tmpOffsets.put(partitionId, offset);
+                    sourceContext.collectWithTimestamp(tmpRowData, 
kafkaTimestamp);
+                }
+                synchronized (lock) {
+                    currentOffsets = tmpOffsets;
+                }
+            })) {}
         }
         LOG.info("Auron kafka source run end");
     }
@@ -376,6 +348,11 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
             kafkaConsumer.close();
         }
 
+        // Close table-runtime WatermarkGenerator
+        if (tableWatermarkGenerator != null) {
+            tableWatermarkGenerator.close();
+        }
+
         super.close();
     }
 
@@ -478,22 +455,7 @@ public class AuronKafkaSourceFunction extends 
RichParallelSourceFunction<RowData
         }
     }
 
-    public AuronKafkaSourceFunction 
assignTimestampsAndWatermarks(WatermarkStrategy<RowData> watermarkStrategy) {
-        checkNotNull(watermarkStrategy);
-        try {
-            ClosureCleaner.clean(watermarkStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
-            this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
-        } catch (Exception e) {
-            throw new IllegalArgumentException("The given WatermarkStrategy is 
not serializable", e);
-        }
-        return this;
-    }
-
-    // 
-------------------------------------------------------------------------
-    //  Internal helpers
-    // 
-------------------------------------------------------------------------
-
-    private String createOutputId(int partitionId) {
-        return topic + "-" + partitionId;
+    public void setWatermarkStrategy(WatermarkStrategy<RowData> 
watermarkStrategy) {
+        this.watermarkStrategy = watermarkStrategy;
     }
 }
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
deleted file mode 100644
index ea819441..00000000
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-
-/**
- * A {@link org.apache.flink.api.common.eventtime.WatermarkOutput} that 
forwards calls to a {@link
- * 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}.
- */
-public class SourceContextWatermarkOutputAdapter<T> implements WatermarkOutput 
{
-    private final SourceContext<T> sourceContext;
-
-    public SourceContextWatermarkOutputAdapter(SourceContext<T> sourceContext) 
{
-        this.sourceContext = sourceContext;
-    }
-
-    @Override
-    public void emitWatermark(Watermark watermark) {
-        sourceContext.emitWatermark(new 
org.apache.flink.streaming.api.watermark.Watermark(watermark.getTimestamp()));
-    }
-
-    @Override
-    public void markIdle() {
-        sourceContext.markAsTemporarilyIdle();
-    }
-
-    @Override
-    public void markActive() {
-        // will be set active with next watermark
-    }
-}

Reply via email to