This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b26c398a8b6 [FLINK-38988][table] Support INET_ATON and INET_NTOA 
functions for IP address conversion
b26c398a8b6 is described below

commit b26c398a8b60b4af8a0688c65ba0e68e36c6852e
Author: Liu Jiangang <[email protected]>
AuthorDate: Mon Mar 23 08:45:10 2026 +0800

    [FLINK-38988][table] Support INET_ATON and INET_NTOA functions for IP 
address conversion
    
    This closes #27483.
---
 docs/data/sql_functions.yml                        |  18 ++
 docs/data/sql_functions_zh.yml                     |  20 +-
 .../docs/reference/pyflink.table/expressions.rst   |   2 +
 flink-python/pyflink/table/expression.py           |  42 ++++
 .../pyflink/table/tests/test_expression.py         |   4 +
 .../flink/table/api/internal/BaseExpressions.java  |  56 +++++
 .../functions/BuiltInFunctionDefinitions.java      |  20 ++
 .../planner/functions/InetFunctionsITCase.java     | 232 +++++++++++++++++++++
 .../runtime/functions/scalar/InetAtonFunction.java | 159 ++++++++++++++
 .../runtime/functions/scalar/InetNtoaFunction.java |  93 +++++++++
 10 files changed, 645 insertions(+), 1 deletion(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index f8f3e199e93..2091a451aff 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -497,6 +497,24 @@ string:
     description:
       Translates a string into 'application/x-www-form-urlencoded' format 
using the UTF-8 encoding scheme.
       If the input is NULL, or there is an issue with the encoding process, or 
the encoding scheme is not supported, will return NULL.
+  - sql: INET_ATON(string)
+    table: STRING.inetAton()
+    description: |
+      Converts an IPv4 address string to its numeric representation (BIGINT). 
Partially compatible with MySQL INET_ATON.
+
+      Supports MySQL-compatible short-form addresses: a.b is interpreted as 
a.0.0.b, and a.b.c is interpreted as a.b.0.c.
+      Leading zeros in octets are parsed as decimal (not octal), consistent 
with MySQL.
+      Returns NULL if input is NULL or invalid.
+
+      E.g., INET_ATON('127.0.0.1') returns 2130706433; INET_ATON('127.1') 
returns 2130706433 (short-form).
+  - sql: INET_NTOA(integer)
+    table: INTEGER.inetNtoa()
+    description: |
+      Converts a numeric IPv4 address representation back to its string format.
+
+      The input must be in the valid IPv4 range [0, 4294967295]. Returns NULL 
if input is NULL or out of range.
+
+      E.g., INET_NTOA(2130706433) returns '127.0.0.1'; INET_NTOA(0) returns 
'0.0.0.0'.
   - sql: PARSE_URL(string1, string2[, string3])
     table: STRING1.parseUrl(STRING2[, STRING3])
     description: |
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 35a546f6d2e..afde8237a99 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -590,8 +590,26 @@ string:
   - sql: URL_ENCODE(string)
     table: STRING.urlEncode()
     description:
-      使用 UTF-8 编码方案将字符串转换为“application/x-www-form-urlencoded”格式。
+      使用 UTF-8 编码方案将字符串转换为"application/x-www-form-urlencoded"格式。
       如果输入为 NULL,或者编码过程出现问题,或者不支持编码方案,则会返回 NULL。
+  - sql: INET_ATON(string)
+    table: STRING.inetAton()
+    description: |
+      将 IPv4 地址字符串转换为数值表示(BIGINT)。部分兼容 MySQL INET_ATON。
+
+      支持 MySQL 兼容的短格式地址:a.b 被解释为 a.0.0.b,a.b.c 被解释为 a.b.0.c。
+      八位字节中的前导零按十进制解析(而非八进制),与 MySQL 保持一致。
+      如果输入为 NULL 或无效,则返回 NULL。
+
+      例如,INET_ATON('127.0.0.1') 返回 2130706433;INET_ATON('127.1') 返回 
2130706433(短格式)。
+  - sql: INET_NTOA(integer)
+    table: INTEGER.inetNtoa()
+    description: |
+      将数值 IPv4 地址表示转换回字符串格式。
+
+      输入必须在有效的 IPv4 范围内 [0, 4294967295]。如果输入为 NULL 或超出范围,则返回 NULL。
+
+      例如,INET_NTOA(2130706433) 返回 '127.0.0.1';INET_NTOA(0) 返回 '0.0.0.0'。
   - sql: PARSE_URL(string1, string2[, string3])
     table: STRING1.parseUrl(STRING2[, STRING3])
     description: |
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 77b860eda70..c2ab212c561 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -201,6 +201,8 @@ string functions
     Expression.locate
     Expression.url_decode
     Expression.url_encode
+    Expression.inet_aton
+    Expression.inet_ntoa
     Expression.parse_url
     Expression.printf
     Expression.ltrim
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 7ed3ff77f6e..7b1e16c0a60 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1447,6 +1447,48 @@ class Expression(Generic[T]):
         """
         return _unary_op("urlEncode")(self)
 
+    def inet_aton(self) -> 'Expression':
+        """
+        Converts an IPv4 address string to its numeric representation (BIGINT).
+        This function follows MySQL INET_ATON behavior.
+
+        The conversion formula is: A * 256^3 + B * 256^2 + C * 256 + D for an 
IP address A.B.C.D
+
+        Supports MySQL-compatible short-form IPv4 addresses:
+          - a — the value is stored directly as an address (value must be in 
[0, 255])
+          - a.b — interpreted as a.0.0.b
+          - a.b.c — interpreted as a.b.0.c
+          - a.b.c.d — standard dotted-decimal format
+
+        Examples:
+          - lit('1').inet_aton() returns 1 (single number)
+          - lit('127.0.0.1').inet_aton() returns 2130706433
+          - lit('127.1').inet_aton() returns 2130706433 (short-form: 127.0.0.1)
+          - lit('0.0.0.0').inet_aton() returns 0
+
+        :return: the numeric representation of the IP address, or null if the 
input is
+                 null or invalid
+        """
+        return _unary_op("inetAton")(self)
+
+    def inet_ntoa(self) -> 'Expression[str]':
+        """
+        Converts a numeric IPv4 address representation back to its string 
format.
+
+        Accepts any integer numeric type (TINYINT, SMALLINT, INT, BIGINT). The 
input must be
+        in the valid IPv4 range [0, 4294967295]. Negative values return null, 
consistent with
+        MySQL's INET_NTOA(-1) = NULL behavior.
+
+        Examples:
+          - lit(2130706433).inet_ntoa() returns '127.0.0.1'
+          - lit(0).inet_ntoa() returns '0.0.0.0'
+          - lit(-1).inet_ntoa() returns null
+
+        :return: the IPv4 address string in dotted-decimal notation, or null 
if the input is
+                 null, negative, or out of valid range
+        """
+        return _unary_op("inetNtoa")(self)
+
     def parse_url(self, part_to_extract: Union[str, 'Expression[str]'],
                   key: Union[str, 'Expression[str]'] = None) -> 
'Expression[str]':
         """
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index 33628437453..536ee1a08e9 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -184,6 +184,10 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual("STARTSWITH(a, b)", str(expr1.starts_with(expr2)))
         self.assertEqual("ENDSWITH(a, b)", str(expr1.ends_with(expr2)))
 
+        # ip address functions
+        self.assertEqual("INET_ATON(a)", str(expr1.inet_aton()))
+        self.assertEqual("INET_NTOA(a)", str(expr1.inet_ntoa()))
+
         # regexp functions
         self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2)))
         self.assertEqual("REGEXP_COUNT(a, b)", str(expr1.regexp_count(expr2)))
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index d2afdc24892..21930bed89b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -109,6 +109,8 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.HEX;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IF;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IF_NULL;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IN;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.INET_ATON;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.INET_NTOA;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.INIT_CAP;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.INSTR;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_FALSE;
@@ -1419,6 +1421,60 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(unresolvedCall(URL_ENCODE, toExpr()));
     }
 
+    /**
+     * Converts an IPv4 address string to its numeric representation. This 
function follows MySQL
+     * INET_ATON behavior.
+     *
+     * <p>The conversion formula is: A * 256^3 + B * 256^2 + C * 256 + D for 
an IP address A.B.C.D
+     *
+     * <p>MySQL-compatible short-form IPv4 addresses are supported:
+     *
+     * <ul>
+     *   <li>a — the value is stored directly as an address (value must be in 
[0, 255])
+     *   <li>a.b — interpreted as a.0.0.b
+     *   <li>a.b.c — interpreted as a.b.0.c
+     *   <li>a.b.c.d — standard dotted-decimal format
+     * </ul>
+     *
+     * <p>Leading zeros in octets are parsed as decimal (not octal), 
consistent with MySQL.
+     *
+     * <p>Examples:
+     *
+     * <ul>
+     *   <li>INET_ATON('1') returns 1 (single number)
+     *   <li>INET_ATON('127.0.0.1') returns 2130706433
+     *   <li>INET_ATON('127.1') returns 2130706433 (short-form: 127.0.0.1)
+     *   <li>INET_ATON('0.0.0.0') returns 0
+     * </ul>
+     *
+     * @return the numeric representation of the IP address, or null if the 
input is null or invalid
+     */
+    public OutType inetAton() {
+        return toApiSpecificExpression(unresolvedCall(INET_ATON, toExpr()));
+    }
+
+    /**
+     * Converts a numeric IPv4 address representation back to its string 
format.
+     *
+     * <p>Accepts any integer numeric type (TINYINT, SMALLINT, INT, BIGINT). 
The input must be in
+     * the valid IPv4 range [0, 4294967295]. Negative values return null, 
consistent with MySQL's
+     * {@code INET_NTOA(-1) = NULL} behavior.
+     *
+     * <p>Examples:
+     *
+     * <ul>
+     *   <li>INET_NTOA(2130706433) returns '127.0.0.1'
+     *   <li>INET_NTOA(0) returns '0.0.0.0'
+     *   <li>INET_NTOA(-1) returns NULL
+     * </ul>
+     *
+     * @return the IPv4 address string in dotted-decimal notation, or null if 
the input is null,
+     *     negative, or out of valid range
+     */
+    public OutType inetNtoa() {
+        return toApiSpecificExpression(unresolvedCall(INET_NTOA, toExpr()));
+    }
+
     /**
      * Parse url and return various parameter of the URL. If accept any null 
arguments, return null.
      */
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index b03988321e1..1499b1c0a60 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -465,6 +465,26 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.UrlEncodeFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition INET_ATON =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("INET_ATON")
+                    .kind(SCALAR)
+                    
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING)))
+                    .outputTypeStrategy(explicit(BIGINT().nullable()))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.InetAtonFunction")
+                    .build();
+
+    public static final BuiltInFunctionDefinition INET_NTOA =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("INET_NTOA")
+                    .kind(SCALAR)
+                    
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.INTEGER_NUMERIC)))
+                    
.outputTypeStrategy(explicit(DataTypes.STRING().nullable()))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.InetNtoaFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("$REPLICATE_ROWS$1")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/InetFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/InetFunctionsITCase.java
new file mode 100644
index 00000000000..d7ee4940b69
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/InetFunctionsITCase.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.Expressions.$;
+
+/**
+ * Tests for {@link BuiltInFunctionDefinitions#INET_ATON} and {@link
+ * BuiltInFunctionDefinitions#INET_NTOA}.
+ */
+public class InetFunctionsITCase extends BuiltInFunctionTestBase {
+
+    @Override
+    Stream<TestSetSpec> getTestSetSpecs() {
+        return Stream.of(
+                inetAtonStandardTestCases(),
+                inetAtonShortFormTestCases(),
+                inetAtonSingleNumberTestCases(),
+                inetAtonLeadingZeroTestCases(),
+                inetAtonInvalidInputTestCases(),
+                inetNtoaTestCases(),
+                inetNtoaIntInputTestCases(),
+                inetNtoaSmallintInputTestCases(),
+                inetNtoaTinyintInputTestCases());
+    }
+
+    /** Test standard 4-octet IPv4 addresses. */
+    private TestSetSpec inetAtonStandardTestCases() {
+        return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_ATON)
+                .onFieldsWithData(
+                        "127.0.0.1", // f0: loopback -> 2130706433
+                        "0.0.0.0", // f1: zero -> 0
+                        "255.255.255.255", // f2: max -> 4294967295
+                        "192.168.1.1", // f3: -> 3232235777
+                        "10.0.0.1" // f4: -> 167772161
+                        )
+                .andDataTypes(STRING(), STRING(), STRING(), STRING(), STRING())
+                .testResult($("f0").inetAton(), "INET_ATON(f0)", 2130706433L, 
BIGINT())
+                .testResult($("f1").inetAton(), "INET_ATON(f1)", 0L, BIGINT())
+                .testResult($("f2").inetAton(), "INET_ATON(f2)", 4294967295L, 
BIGINT())
+                .testResult($("f3").inetAton(), "INET_ATON(f3)", 3232235777L, 
BIGINT())
+                .testResult($("f4").inetAton(), "INET_ATON(f4)", 167772161L, 
BIGINT());
+    }
+
+    /** Test MySQL-compatible short-form IPv4 addresses: a.b -> a.0.0.b, a.b.c 
-> a.b.0.c. */
+    private TestSetSpec inetAtonShortFormTestCases() {
+        return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_ATON)
+                .onFieldsWithData(
+                        "127.1", // f0: short form a.b -> 127.0.0.1 = 
2130706433
+                        "127.0.1", // f1: short form a.b.c -> 127.0.0.1 = 
2130706433
+                        "10.1", // f2: short form a.b -> 10.0.0.1 = 167772161
+                        "192.168.1", // f3: short form a.b.c -> 192.168.0.1 = 
3232235521
+                        "1.2" // f4: short form a.b -> 1.0.0.2 = 16777218
+                        )
+                .andDataTypes(STRING(), STRING(), STRING(), STRING(), STRING())
+                .testResult($("f0").inetAton(), "INET_ATON(f0)", 2130706433L, 
BIGINT())
+                .testResult($("f1").inetAton(), "INET_ATON(f1)", 2130706433L, 
BIGINT())
+                .testResult($("f2").inetAton(), "INET_ATON(f2)", 167772161L, 
BIGINT())
+                .testResult($("f3").inetAton(), "INET_ATON(f3)", 3232235521L, 
BIGINT())
+                .testResult($("f4").inetAton(), "INET_ATON(f4)", 16777218L, 
BIGINT());
+    }
+
+    /** Test single-number addresses: value is treated as a direct address 
(must be in [0, 255]). */
+    private TestSetSpec inetAtonSingleNumberTestCases() {
+        return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_ATON)
+                .onFieldsWithData(
+                        "1", // f0: single number -> 1
+                        "0", // f1: zero -> 0
+                        "255", // f2: max single number -> 255
+                        "256", // f3: exceeds 255 -> null
+                        "167772161", // f4: exceeds 255 -> null
+                        "4294967295" // f5: exceeds 255 -> null
+                        )
+                .andDataTypes(STRING(), STRING(), STRING(), STRING(), 
STRING(), STRING())
+                .testResult($("f0").inetAton(), "INET_ATON(f0)", 1L, BIGINT())
+                .testResult($("f1").inetAton(), "INET_ATON(f1)", 0L, BIGINT())
+                .testResult($("f2").inetAton(), "INET_ATON(f2)", 255L, 
BIGINT())
+                .testResult($("f3").inetAton(), "INET_ATON(f3)", null, 
BIGINT().nullable())
+                .testResult($("f4").inetAton(), "INET_ATON(f4)", null, 
BIGINT().nullable())
+                .testResult($("f5").inetAton(), "INET_ATON(f5)", null, 
BIGINT().nullable());
+    }
+
+    /** Test leading zeros are parsed as decimal (MySQL behavior), not octal. 
*/
+    private TestSetSpec inetAtonLeadingZeroTestCases() {
+        return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_ATON)
+                .onFieldsWithData(
+                        "010.000.000.001", // f0: leading zeros as decimal 
(10.0.0.1) = 167772161
+                        "192.168.001.001", // f1: leading zeros as decimal 
(192.168.1.1) =
+                        // 3232235777
+                        "001.002.003.004" // f2: leading zeros as decimal 
(1.2.3.4) = 16909060
+                        )
+                .andDataTypes(STRING(), STRING(), STRING())
+                .testResult($("f0").inetAton(), "INET_ATON(f0)", 167772161L, 
BIGINT())
+                .testResult($("f1").inetAton(), "INET_ATON(f1)", 3232235777L, 
BIGINT())
+                .testResult($("f2").inetAton(), "INET_ATON(f2)", 16909060L, 
BIGINT());
+    }
+
+    /** Test invalid inputs return null. */
+    private TestSetSpec inetAtonInvalidInputTestCases() {
+        return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_ATON)
+                .onFieldsWithData(
+                        null, // f0: null
+                        "", // f1: empty
+                        "invalid", // f2: invalid format
+                        "256.0.0.1", // f3: octet out of range (256 > 255)
+                        "1.2.3.4.5", // f4: extra octet (5 octets)
+                        "1.2.3.", // f5: trailing dot
+                        ".1.2.3", // f6: leading dot
+                        "1..2.3", // f7: double dot
+                        " 127.0.0.1", // f8: leading space (no trim)
+                        "127.0.0.1 " // f9: trailing space (no trim)
+                        )
+                .andDataTypes(
+                        STRING(), STRING(), STRING(), STRING(), STRING(), 
STRING(), STRING(),
+                        STRING(), STRING(), STRING())
+                .testResult($("f0").inetAton(), "INET_ATON(f0)", null, 
BIGINT().nullable())
+                .testResult($("f1").inetAton(), "INET_ATON(f1)", null, 
BIGINT().nullable())
+                .testResult($("f2").inetAton(), "INET_ATON(f2)", null, 
BIGINT().nullable())
+                .testResult($("f3").inetAton(), "INET_ATON(f3)", null, 
BIGINT().nullable())
+                .testResult($("f4").inetAton(), "INET_ATON(f4)", null, 
BIGINT().nullable())
+                .testResult($("f5").inetAton(), "INET_ATON(f5)", null, 
BIGINT().nullable())
+                .testResult($("f6").inetAton(), "INET_ATON(f6)", null, 
BIGINT().nullable())
+                .testResult($("f7").inetAton(), "INET_ATON(f7)", null, 
BIGINT().nullable())
+                .testResult($("f8").inetAton(), "INET_ATON(f8)", null, 
BIGINT().nullable())
+                .testResult($("f9").inetAton(), "INET_ATON(f9)", null, 
BIGINT().nullable());
+    }
+
+    /** Test INET_NTOA with BIGINT inputs. */
+    private TestSetSpec inetNtoaTestCases() {
+        return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_NTOA)
+                .onFieldsWithData(
+                        2130706433L, // f0: 127.0.0.1
+                        0L, // f1: 0.0.0.0
+                        4294967295L, // f2: 255.255.255.255
+                        3232235777L, // f3: 192.168.1.1
+                        167772161L, // f4: 10.0.0.1
+                        null, // f5: null
+                        -1L, // f6: negative (invalid)
+                        4294967296L // f7: exceeds max (invalid)
+                        )
+                .andDataTypes(
+                        BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(), 
BIGINT(), BIGINT(),
+                        BIGINT())
+                .testResult($("f0").inetNtoa(), "INET_NTOA(f0)", "127.0.0.1", 
STRING())
+                .testResult($("f1").inetNtoa(), "INET_NTOA(f1)", "0.0.0.0", 
STRING())
+                .testResult($("f2").inetNtoa(), "INET_NTOA(f2)", 
"255.255.255.255", STRING())
+                .testResult($("f3").inetNtoa(), "INET_NTOA(f3)", 
"192.168.1.1", STRING())
+                .testResult($("f4").inetNtoa(), "INET_NTOA(f4)", "10.0.0.1", 
STRING())
+                .testResult($("f5").inetNtoa(), "INET_NTOA(f5)", null, 
STRING().nullable())
+                .testResult($("f6").inetNtoa(), "INET_NTOA(f6)", null, 
STRING().nullable())
+                .testResult($("f7").inetNtoa(), "INET_NTOA(f7)", null, 
STRING().nullable());
+    }
+
+    /** Test INET_NTOA with INT inputs. */
+    private TestSetSpec inetNtoaIntInputTestCases() {
+        return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_NTOA)
+                .onFieldsWithData(
+                        2130706433, // f0: 127.0.0.1 (as INT)
+                        0, // f1: 0.0.0.0 (as INT)
+                        167772161, // f2: 10.0.0.1 (as INT)
+                        (Integer) null, // f3: null (as INT)
+                        -1 // f4: negative -> null (consistent with MySQL)
+                        )
+                .andDataTypes(INT(), INT(), INT(), INT(), INT())
+                .testResult($("f0").inetNtoa(), "INET_NTOA(f0)", "127.0.0.1", 
STRING())
+                .testResult($("f1").inetNtoa(), "INET_NTOA(f1)", "0.0.0.0", 
STRING())
+                .testResult($("f2").inetNtoa(), "INET_NTOA(f2)", "10.0.0.1", 
STRING())
+                .testResult($("f3").inetNtoa(), "INET_NTOA(f3)", null, 
STRING().nullable())
+                .testResult($("f4").inetNtoa(), "INET_NTOA(f4)", null, 
STRING().nullable());
+    }
+
+    /** Test INET_NTOA with SMALLINT inputs. */
+    private TestSetSpec inetNtoaSmallintInputTestCases() {
+        return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_NTOA)
+                .onFieldsWithData(
+                        (short) 256, // f0: 0.0.1.0
+                        (short) 0, // f1: 0.0.0.0
+                        (short) 1, // f2: 0.0.0.1
+                        (Short) null, // f3: null
+                        (short) -1 // f4: negative -> null (consistent with 
MySQL)
+                        )
+                .andDataTypes(SMALLINT(), SMALLINT(), SMALLINT(), SMALLINT(), 
SMALLINT())
+                .testResult($("f0").inetNtoa(), "INET_NTOA(f0)", "0.0.1.0", 
STRING())
+                .testResult($("f1").inetNtoa(), "INET_NTOA(f1)", "0.0.0.0", 
STRING())
+                .testResult($("f2").inetNtoa(), "INET_NTOA(f2)", "0.0.0.1", 
STRING())
+                .testResult($("f3").inetNtoa(), "INET_NTOA(f3)", null, 
STRING().nullable())
+                .testResult($("f4").inetNtoa(), "INET_NTOA(f4)", null, 
STRING().nullable());
+    }
+
+    /** Test INET_NTOA with TINYINT inputs. */
+    private TestSetSpec inetNtoaTinyintInputTestCases() {
+        return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_NTOA)
+                .onFieldsWithData(
+                        (byte) 1, // f0: 0.0.0.1
+                        (byte) 0, // f1: 0.0.0.0
+                        (byte) 127, // f2: 0.0.0.127
+                        (Byte) null, // f3: null
+                        (byte) -1 // f4: negative -> null (consistent with 
MySQL)
+                        )
+                .andDataTypes(TINYINT(), TINYINT(), TINYINT(), TINYINT(), 
TINYINT())
+                .testResult($("f0").inetNtoa(), "INET_NTOA(f0)", "0.0.0.1", 
STRING())
+                .testResult($("f1").inetNtoa(), "INET_NTOA(f1)", "0.0.0.0", 
STRING())
+                .testResult($("f2").inetNtoa(), "INET_NTOA(f2)", "0.0.0.127", 
STRING())
+                .testResult($("f3").inetNtoa(), "INET_NTOA(f3)", null, 
STRING().nullable())
+                .testResult($("f4").inetNtoa(), "INET_NTOA(f4)", null, 
STRING().nullable());
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetAtonFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetAtonFunction.java
new file mode 100644
index 00000000000..b75c48925ba
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetAtonFunction.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#INET_ATON}.
+ *
+ * <p>This function converts an IPv4 address string to its numeric 
representation. It is partially
+ * compatible with MySQL INET_ATON, including support for short-form IPv4 
addresses.
+ *
+ * <p>The conversion formula for a standard IP address A.B.C.D is: A * 256^3 + 
B * 256^2 + C * 256 +
+ * D
+ *
+ * <p>MySQL-compatible short-form IPv4 addresses are supported:
+ *
+ * <ul>
+ *   <li>a — the value is stored directly as an address (value must be in [0, 
255])
+ *   <li>a.b — interpreted as a.0.0.b
+ *   <li>a.b.c — interpreted as a.b.0.c
+ *   <li>a.b.c.d — standard dotted-decimal format
+ * </ul>
+ *
+ * <p>Leading zeros in octets are parsed as decimal (consistent with MySQL), 
not octal.
+ *
+ * <p>Note: This function only supports IPv4 addresses. IPv6 addresses are not 
supported.
+ *
+ * <p>Examples:
+ *
+ * <ul>
+ *   <li>INET_ATON('1') returns 1 (single number)
+ *   <li>INET_ATON('127.0.0.1') returns 2130706433
+ *   <li>INET_ATON('127.1') returns 2130706433 (short-form: 127.0.0.1)
+ *   <li>INET_ATON('127.0.1') returns 2130706433 (short-form: 127.0.0.1)
+ *   <li>INET_ATON('10.0.0.1') returns 167772161
+ *   <li>INET_ATON('0.0.0.0') returns 0
+ * </ul>
+ */
+@Internal
+public class InetAtonFunction extends BuiltInScalarFunction {
+
+    public InetAtonFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.INET_ATON, context);
+    }
+
+    /**
+     * Converts an IPv4 address string to its numeric representation.
+     *
+     * @param ipAddress the IPv4 address string in dotted-decimal notation 
(supports short-form)
+     * @return the numeric representation of the IP address, or null if input 
is null or invalid
+     */
+    public @Nullable Long eval(@Nullable StringData ipAddress) {
+        if (ipAddress == null) {
+            return null;
+        }
+
+        BinaryStringData binaryStr = (BinaryStringData) ipAddress;
+        if (BinaryStringDataUtil.isEmpty(binaryStr)) {
+            return null;
+        }
+
+        return ipToLong(binaryStr);
+    }
+
+    /**
+     * Converts an IPv4 address string to a long value.
+     *
+     * <p>Operates directly on UTF-8 bytes from {@link BinaryStringData} to 
avoid unnecessary String
+     * object allocation. Since IPv4 addresses contain only ASCII characters 
('0'-'9' and '.'), each
+     * character is exactly one byte in UTF-8 encoding.
+     *
+     * <p>Supports MySQL-compatible short-form addresses:
+     *
+     * <ul>
+     *   <li>a — direct value (value must be in [0, 255])
+     *   <li>a.b — a.0.0.b (each part must be in [0, 255])
+     *   <li>a.b.c — a.b.0.c (each part must be in [0, 255])
+     *   <li>a.b.c.d — standard format (each part must be in [0, 255])
+     * </ul>
+     *
+     * <p>Leading zeros are treated as decimal (not octal), consistent with 
MySQL behavior.
+     *
+     * @param ip the IPv4 address as BinaryStringData
+     * @return the long value, or null if the IP address is invalid
+     */
+    private static @Nullable Long ipToLong(BinaryStringData ip) {
+        int len = ip.getSizeInBytes();
+        int partCount = 0;
+        int partStart = 0;
+        long[] parts = new long[4];
+
+        for (int i = 0; i <= len; i++) {
+            if (i == len || ip.byteAt(i) == '.') {
+                if (partCount >= 4) {
+                    return null;
+                }
+                if (partStart == i) {
+                    return null;
+                }
+
+                // Parse number manually from bytes
+                long value = 0;
+                for (int j = partStart; j < i; j++) {
+                    byte b = ip.byteAt(j);
+                    if (b < '0' || b > '9') {
+                        return null;
+                    }
+                    value = value * 10 + (b - '0');
+                    if (value > 255) {
+                        return null;
+                    }
+                }
+                parts[partCount++] = value;
+                partStart = i + 1;
+            }
+        }
+
+        switch (partCount) {
+            case 1:
+                // Single number: direct value
+                return parts[0];
+            case 2:
+                // a.b -> a.0.0.b
+                return (parts[0] << 24) | parts[1];
+            case 3:
+                // a.b.c -> a.b.0.c
+                return (parts[0] << 24) | (parts[1] << 16) | parts[2];
+            case 4:
+                // a.b.c.d -> standard format
+                return (parts[0] << 24) | (parts[1] << 16) | (parts[2] << 8) | 
parts[3];
+            default:
+                return null;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetNtoaFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetNtoaFunction.java
new file mode 100644
index 00000000000..6e2333072a2
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetNtoaFunction.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#INET_NTOA}.
+ *
+ * <p>This function converts a numeric IPv4 address representation back to its 
string format.
+ *
+ * <p>The conversion extracts each octet from the numeric value using bit 
shifting and masking.
+ *
+ * <p>Accepts any integer numeric type (TINYINT, SMALLINT, INT, BIGINT). 
Negative values return
+ * null, consistent with MySQL's {@code INET_NTOA(-1) = NULL} behavior.
+ *
+ * <p>Note: This function only supports IPv4 addresses. IPv6 addresses are not 
supported.
+ *
+ * <p>Examples:
+ *
+ * <ul>
+ *   <li>INET_NTOA(2130706433) returns '127.0.0.1'
+ *   <li>INET_NTOA(167772161) returns '10.0.0.1'
+ *   <li>INET_NTOA(0) returns '0.0.0.0'
+ *   <li>INET_NTOA(-1) returns NULL
+ * </ul>
+ */
+@Internal
+public class InetNtoaFunction extends BuiltInScalarFunction {
+
+    public InetNtoaFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.INET_NTOA, context);
+    }
+
+    /**
+     * Converts a numeric IPv4 address representation to its string format.
+     *
+     * <p>Accepts any integer numeric type (TINYINT, SMALLINT, INT, BIGINT). 
Negative values return
+     * null, consistent with MySQL's {@code INET_NTOA(-1) = NULL} behavior.
+     *
+     * @param ipNumber the numeric representation of the IPv4 address
+     * @return the IPv4 address string in dotted-decimal notation, or null if 
input is null,
+     *     negative, or out of valid range [0, 4294967295]
+     */
+    public @Nullable StringData eval(@Nullable Number ipNumber) {
+        if (ipNumber == null) {
+            return null;
+        }
+        return longToIp(ipNumber.longValue());
+    }
+
+    /**
+     * Converts a numeric IPv4 address to its dotted-decimal string format.
+     *
+     * @param ipNumber the numeric representation
+     * @return the IPv4 address string, or null if out of valid range [0, 
4294967295]
+     */
+    private static @Nullable StringData longToIp(long ipNumber) {
+        if (ipNumber < 0 || ipNumber > 0xFFFFFFFFL) {
+            return null;
+        }
+
+        return StringData.fromString(
+                ((ipNumber >> 24) & 0xFF)
+                        + "."
+                        + ((ipNumber >> 16) & 0xFF)
+                        + "."
+                        + ((ipNumber >> 8) & 0xFF)
+                        + "."
+                        + (ipNumber & 0xFF));
+    }
+}


Reply via email to