This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b26c398a8b6 [FLINK-38988][table] Support INET_ATON and INET_NTOA
functions for IP address conversion
b26c398a8b6 is described below
commit b26c398a8b60b4af8a0688c65ba0e68e36c6852e
Author: Liu Jiangang <[email protected]>
AuthorDate: Mon Mar 23 08:45:10 2026 +0800
[FLINK-38988][table] Support INET_ATON and INET_NTOA functions for IP
address conversion
This closes #27483.
---
docs/data/sql_functions.yml | 18 ++
docs/data/sql_functions_zh.yml | 20 +-
.../docs/reference/pyflink.table/expressions.rst | 2 +
flink-python/pyflink/table/expression.py | 42 ++++
.../pyflink/table/tests/test_expression.py | 4 +
.../flink/table/api/internal/BaseExpressions.java | 56 +++++
.../functions/BuiltInFunctionDefinitions.java | 20 ++
.../planner/functions/InetFunctionsITCase.java | 232 +++++++++++++++++++++
.../runtime/functions/scalar/InetAtonFunction.java | 159 ++++++++++++++
.../runtime/functions/scalar/InetNtoaFunction.java | 93 +++++++++
10 files changed, 645 insertions(+), 1 deletion(-)
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index f8f3e199e93..2091a451aff 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -497,6 +497,24 @@ string:
description:
Translates a string into 'application/x-www-form-urlencoded' format
using the UTF-8 encoding scheme.
If the input is NULL, or there is an issue with the encoding process, or
the encoding scheme is not supported, will return NULL.
+ - sql: INET_ATON(string)
+ table: STRING.inetAton()
+ description: |
+ Converts an IPv4 address string to its numeric representation (BIGINT).
Partially compatible with MySQL INET_ATON.
+
+ Supports MySQL-compatible short-form addresses: a.b is interpreted as
a.0.0.b, and a.b.c is interpreted as a.b.0.c.
+ Leading zeros in octets are parsed as decimal (not octal), consistent
with MySQL.
+ Returns NULL if input is NULL or invalid.
+
+ E.g., INET_ATON('127.0.0.1') returns 2130706433; INET_ATON('127.1')
returns 2130706433 (short-form).
+ - sql: INET_NTOA(integer)
+ table: INTEGER.inetNtoa()
+ description: |
+ Converts a numeric IPv4 address representation back to its string format.
+
+ The input must be in the valid IPv4 range [0, 4294967295]. Returns NULL
if input is NULL or out of range.
+
+ E.g., INET_NTOA(2130706433) returns '127.0.0.1'; INET_NTOA(0) returns
'0.0.0.0'.
- sql: PARSE_URL(string1, string2[, string3])
table: STRING1.parseUrl(STRING2[, STRING3])
description: |
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 35a546f6d2e..afde8237a99 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -590,8 +590,26 @@ string:
- sql: URL_ENCODE(string)
table: STRING.urlEncode()
description:
- 使用 UTF-8 编码方案将字符串转换为“application/x-www-form-urlencoded”格式。
+ 使用 UTF-8 编码方案将字符串转换为"application/x-www-form-urlencoded"格式。
如果输入为 NULL,或者编码过程出现问题,或者不支持编码方案,则会返回 NULL。
+ - sql: INET_ATON(string)
+ table: STRING.inetAton()
+ description: |
+ 将 IPv4 地址字符串转换为数值表示(BIGINT)。部分兼容 MySQL INET_ATON。
+
+ 支持 MySQL 兼容的短格式地址:a.b 被解释为 a.0.0.b,a.b.c 被解释为 a.b.0.c。
+ 八位字节中的前导零按十进制解析(而非八进制),与 MySQL 保持一致。
+ 如果输入为 NULL 或无效,则返回 NULL。
+
+ 例如,INET_ATON('127.0.0.1') 返回 2130706433;INET_ATON('127.1') 返回
2130706433(短格式)。
+ - sql: INET_NTOA(integer)
+ table: INTEGER.inetNtoa()
+ description: |
+ 将数值 IPv4 地址表示转换回字符串格式。
+
+ 输入必须在有效的 IPv4 范围内 [0, 4294967295]。如果输入为 NULL 或超出范围,则返回 NULL。
+
+ 例如,INET_NTOA(2130706433) 返回 '127.0.0.1';INET_NTOA(0) 返回 '0.0.0.0'。
- sql: PARSE_URL(string1, string2[, string3])
table: STRING1.parseUrl(STRING2[, STRING3])
description: |
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 77b860eda70..c2ab212c561 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -201,6 +201,8 @@ string functions
Expression.locate
Expression.url_decode
Expression.url_encode
+ Expression.inet_aton
+ Expression.inet_ntoa
Expression.parse_url
Expression.printf
Expression.ltrim
diff --git a/flink-python/pyflink/table/expression.py
b/flink-python/pyflink/table/expression.py
index 7ed3ff77f6e..7b1e16c0a60 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1447,6 +1447,48 @@ class Expression(Generic[T]):
"""
return _unary_op("urlEncode")(self)
+ def inet_aton(self) -> 'Expression':
+ """
+ Converts an IPv4 address string to its numeric representation (BIGINT).
+ This function follows MySQL INET_ATON behavior.
+
+ The conversion formula is: A * 256^3 + B * 256^2 + C * 256 + D for an
IP address A.B.C.D
+
+ Supports MySQL-compatible short-form IPv4 addresses:
+ - a — the value is stored directly as an address (value must be in
[0, 255])
+ - a.b — interpreted as a.0.0.b
+ - a.b.c — interpreted as a.b.0.c
+ - a.b.c.d — standard dotted-decimal format
+
+ Examples:
+ - lit('1').inet_aton() returns 1 (single number)
+ - lit('127.0.0.1').inet_aton() returns 2130706433
+ - lit('127.1').inet_aton() returns 2130706433 (short-form: 127.0.0.1)
+ - lit('0.0.0.0').inet_aton() returns 0
+
+ :return: the numeric representation of the IP address, or null if the
input is
+ null or invalid
+ """
+ return _unary_op("inetAton")(self)
+
+ def inet_ntoa(self) -> 'Expression[str]':
+ """
+ Converts a numeric IPv4 address representation back to its string
format.
+
+ Accepts any integer numeric type (TINYINT, SMALLINT, INT, BIGINT). The
input must be
+ in the valid IPv4 range [0, 4294967295]. Negative values return null,
consistent with
+ MySQL's INET_NTOA(-1) = NULL behavior.
+
+ Examples:
+ - lit(2130706433).inet_ntoa() returns '127.0.0.1'
+ - lit(0).inet_ntoa() returns '0.0.0.0'
+ - lit(-1).inet_ntoa() returns null
+
+ :return: the IPv4 address string in dotted-decimal notation, or null
if the input is
+ null, negative, or out of valid range
+ """
+ return _unary_op("inetNtoa")(self)
+
def parse_url(self, part_to_extract: Union[str, 'Expression[str]'],
key: Union[str, 'Expression[str]'] = None) ->
'Expression[str]':
"""
diff --git a/flink-python/pyflink/table/tests/test_expression.py
b/flink-python/pyflink/table/tests/test_expression.py
index 33628437453..536ee1a08e9 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -184,6 +184,10 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
self.assertEqual("STARTSWITH(a, b)", str(expr1.starts_with(expr2)))
self.assertEqual("ENDSWITH(a, b)", str(expr1.ends_with(expr2)))
+ # ip address functions
+ self.assertEqual("INET_ATON(a)", str(expr1.inet_aton()))
+ self.assertEqual("INET_NTOA(a)", str(expr1.inet_ntoa()))
+
# regexp functions
self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2)))
self.assertEqual("REGEXP_COUNT(a, b)", str(expr1.regexp_count(expr2)))
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index d2afdc24892..21930bed89b 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -109,6 +109,8 @@ import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.HEX;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IF;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IF_NULL;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.IN;
+import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.INET_ATON;
+import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.INET_NTOA;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.INIT_CAP;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.INSTR;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_FALSE;
@@ -1419,6 +1421,60 @@ public abstract class BaseExpressions<InType, OutType> {
return toApiSpecificExpression(unresolvedCall(URL_ENCODE, toExpr()));
}
+ /**
+ * Converts an IPv4 address string to its numeric representation. This
function follows MySQL
+ * INET_ATON behavior.
+ *
+ * <p>The conversion formula is: A * 256^3 + B * 256^2 + C * 256 + D for
an IP address A.B.C.D
+ *
+ * <p>MySQL-compatible short-form IPv4 addresses are supported:
+ *
+ * <ul>
+ * <li>a — the value is stored directly as an address (value must be in
[0, 255])
+ * <li>a.b — interpreted as a.0.0.b
+ * <li>a.b.c — interpreted as a.b.0.c
+ * <li>a.b.c.d — standard dotted-decimal format
+ * </ul>
+ *
+ * <p>Leading zeros in octets are parsed as decimal (not octal),
consistent with MySQL.
+ *
+ * <p>Examples:
+ *
+ * <ul>
+ * <li>INET_ATON('1') returns 1 (single number)
+ * <li>INET_ATON('127.0.0.1') returns 2130706433
+ * <li>INET_ATON('127.1') returns 2130706433 (short-form: 127.0.0.1)
+ * <li>INET_ATON('0.0.0.0') returns 0
+ * </ul>
+ *
+ * @return the numeric representation of the IP address, or null if the
input is null or invalid
+ */
+ public OutType inetAton() {
+ return toApiSpecificExpression(unresolvedCall(INET_ATON, toExpr()));
+ }
+
+ /**
+ * Converts a numeric IPv4 address representation back to its string
format.
+ *
+ * <p>Accepts any integer numeric type (TINYINT, SMALLINT, INT, BIGINT).
The input must be in
+ * the valid IPv4 range [0, 4294967295]. Negative values return null,
consistent with MySQL's
+ * {@code INET_NTOA(-1) = NULL} behavior.
+ *
+ * <p>Examples:
+ *
+ * <ul>
+ * <li>INET_NTOA(2130706433) returns '127.0.0.1'
+ * <li>INET_NTOA(0) returns '0.0.0.0'
+ * <li>INET_NTOA(-1) returns NULL
+ * </ul>
+ *
+ * @return the IPv4 address string in dotted-decimal notation, or null if
the input is null,
+ * negative, or out of valid range
+ */
+ public OutType inetNtoa() {
+ return toApiSpecificExpression(unresolvedCall(INET_NTOA, toExpr()));
+ }
+
/**
* Parse url and return various parameter of the URL. If accept any null
arguments, return null.
*/
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index b03988321e1..1499b1c0a60 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -465,6 +465,26 @@ public final class BuiltInFunctionDefinitions {
"org.apache.flink.table.runtime.functions.scalar.UrlEncodeFunction")
.build();
+ public static final BuiltInFunctionDefinition INET_ATON =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("INET_ATON")
+ .kind(SCALAR)
+
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING)))
+ .outputTypeStrategy(explicit(BIGINT().nullable()))
+ .runtimeClass(
+
"org.apache.flink.table.runtime.functions.scalar.InetAtonFunction")
+ .build();
+
+ public static final BuiltInFunctionDefinition INET_NTOA =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("INET_NTOA")
+ .kind(SCALAR)
+
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.INTEGER_NUMERIC)))
+
.outputTypeStrategy(explicit(DataTypes.STRING().nullable()))
+ .runtimeClass(
+
"org.apache.flink.table.runtime.functions.scalar.InetNtoaFunction")
+ .build();
+
public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS =
BuiltInFunctionDefinition.newBuilder()
.name("$REPLICATE_ROWS$1")
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/InetFunctionsITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/InetFunctionsITCase.java
new file mode 100644
index 00000000000..d7ee4940b69
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/InetFunctionsITCase.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.Expressions.$;
+
+/**
+ * Tests for {@link BuiltInFunctionDefinitions#INET_ATON} and {@link
+ * BuiltInFunctionDefinitions#INET_NTOA}.
+ */
+public class InetFunctionsITCase extends BuiltInFunctionTestBase {
+
+ @Override
+ Stream<TestSetSpec> getTestSetSpecs() {
+ return Stream.of(
+ inetAtonStandardTestCases(),
+ inetAtonShortFormTestCases(),
+ inetAtonSingleNumberTestCases(),
+ inetAtonLeadingZeroTestCases(),
+ inetAtonInvalidInputTestCases(),
+ inetNtoaTestCases(),
+ inetNtoaIntInputTestCases(),
+ inetNtoaSmallintInputTestCases(),
+ inetNtoaTinyintInputTestCases());
+ }
+
+ /** Test standard 4-octet IPv4 addresses. */
+ private TestSetSpec inetAtonStandardTestCases() {
+ return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_ATON)
+ .onFieldsWithData(
+ "127.0.0.1", // f0: loopback -> 2130706433
+ "0.0.0.0", // f1: zero -> 0
+ "255.255.255.255", // f2: max -> 4294967295
+ "192.168.1.1", // f3: -> 3232235777
+ "10.0.0.1" // f4: -> 167772161
+ )
+ .andDataTypes(STRING(), STRING(), STRING(), STRING(), STRING())
+ .testResult($("f0").inetAton(), "INET_ATON(f0)", 2130706433L,
BIGINT())
+ .testResult($("f1").inetAton(), "INET_ATON(f1)", 0L, BIGINT())
+ .testResult($("f2").inetAton(), "INET_ATON(f2)", 4294967295L,
BIGINT())
+ .testResult($("f3").inetAton(), "INET_ATON(f3)", 3232235777L,
BIGINT())
+ .testResult($("f4").inetAton(), "INET_ATON(f4)", 167772161L,
BIGINT());
+ }
+
+ /** Test MySQL-compatible short-form IPv4 addresses: a.b -> a.0.0.b, a.b.c
-> a.b.0.c. */
+ private TestSetSpec inetAtonShortFormTestCases() {
+ return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_ATON)
+ .onFieldsWithData(
+ "127.1", // f0: short form a.b -> 127.0.0.1 =
2130706433
+ "127.0.1", // f1: short form a.b.c -> 127.0.0.1 =
2130706433
+ "10.1", // f2: short form a.b -> 10.0.0.1 = 167772161
+ "192.168.1", // f3: short form a.b.c -> 192.168.0.1 =
3232235521
+ "1.2" // f4: short form a.b -> 1.0.0.2 = 16777218
+ )
+ .andDataTypes(STRING(), STRING(), STRING(), STRING(), STRING())
+ .testResult($("f0").inetAton(), "INET_ATON(f0)", 2130706433L,
BIGINT())
+ .testResult($("f1").inetAton(), "INET_ATON(f1)", 2130706433L,
BIGINT())
+ .testResult($("f2").inetAton(), "INET_ATON(f2)", 167772161L,
BIGINT())
+ .testResult($("f3").inetAton(), "INET_ATON(f3)", 3232235521L,
BIGINT())
+ .testResult($("f4").inetAton(), "INET_ATON(f4)", 16777218L,
BIGINT());
+ }
+
+ /** Test single-number addresses: value is treated as a direct address
(must be in [0, 255]). */
+ private TestSetSpec inetAtonSingleNumberTestCases() {
+ return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_ATON)
+ .onFieldsWithData(
+ "1", // f0: single number -> 1
+ "0", // f1: zero -> 0
+ "255", // f2: max single number -> 255
+ "256", // f3: exceeds 255 -> null
+ "167772161", // f4: exceeds 255 -> null
+ "4294967295" // f5: exceeds 255 -> null
+ )
+ .andDataTypes(STRING(), STRING(), STRING(), STRING(),
STRING(), STRING())
+ .testResult($("f0").inetAton(), "INET_ATON(f0)", 1L, BIGINT())
+ .testResult($("f1").inetAton(), "INET_ATON(f1)", 0L, BIGINT())
+ .testResult($("f2").inetAton(), "INET_ATON(f2)", 255L,
BIGINT())
+ .testResult($("f3").inetAton(), "INET_ATON(f3)", null,
BIGINT().nullable())
+ .testResult($("f4").inetAton(), "INET_ATON(f4)", null,
BIGINT().nullable())
+ .testResult($("f5").inetAton(), "INET_ATON(f5)", null,
BIGINT().nullable());
+ }
+
+ /** Test leading zeros are parsed as decimal (MySQL behavior), not octal.
*/
+ private TestSetSpec inetAtonLeadingZeroTestCases() {
+ return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_ATON)
+ .onFieldsWithData(
+ "010.000.000.001", // f0: leading zeros as decimal
(10.0.0.1) = 167772161
+ "192.168.001.001", // f1: leading zeros as decimal
(192.168.1.1) =
+ // 3232235777
+ "001.002.003.004" // f2: leading zeros as decimal
(1.2.3.4) = 16909060
+ )
+ .andDataTypes(STRING(), STRING(), STRING())
+ .testResult($("f0").inetAton(), "INET_ATON(f0)", 167772161L,
BIGINT())
+ .testResult($("f1").inetAton(), "INET_ATON(f1)", 3232235777L,
BIGINT())
+ .testResult($("f2").inetAton(), "INET_ATON(f2)", 16909060L,
BIGINT());
+ }
+
+ /** Test invalid inputs return null. */
+ private TestSetSpec inetAtonInvalidInputTestCases() {
+ return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_ATON)
+ .onFieldsWithData(
+ null, // f0: null
+ "", // f1: empty
+ "invalid", // f2: invalid format
+ "256.0.0.1", // f3: octet out of range (256 > 255)
+ "1.2.3.4.5", // f4: extra octet (5 octets)
+ "1.2.3.", // f5: trailing dot
+ ".1.2.3", // f6: leading dot
+ "1..2.3", // f7: double dot
+ " 127.0.0.1", // f8: leading space (no trim)
+ "127.0.0.1 " // f9: trailing space (no trim)
+ )
+ .andDataTypes(
+ STRING(), STRING(), STRING(), STRING(), STRING(),
STRING(), STRING(),
+ STRING(), STRING(), STRING())
+ .testResult($("f0").inetAton(), "INET_ATON(f0)", null,
BIGINT().nullable())
+ .testResult($("f1").inetAton(), "INET_ATON(f1)", null,
BIGINT().nullable())
+ .testResult($("f2").inetAton(), "INET_ATON(f2)", null,
BIGINT().nullable())
+ .testResult($("f3").inetAton(), "INET_ATON(f3)", null,
BIGINT().nullable())
+ .testResult($("f4").inetAton(), "INET_ATON(f4)", null,
BIGINT().nullable())
+ .testResult($("f5").inetAton(), "INET_ATON(f5)", null,
BIGINT().nullable())
+ .testResult($("f6").inetAton(), "INET_ATON(f6)", null,
BIGINT().nullable())
+ .testResult($("f7").inetAton(), "INET_ATON(f7)", null,
BIGINT().nullable())
+ .testResult($("f8").inetAton(), "INET_ATON(f8)", null,
BIGINT().nullable())
+ .testResult($("f9").inetAton(), "INET_ATON(f9)", null,
BIGINT().nullable());
+ }
+
+ /** Test INET_NTOA with BIGINT inputs. */
+ private TestSetSpec inetNtoaTestCases() {
+ return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_NTOA)
+ .onFieldsWithData(
+ 2130706433L, // f0: 127.0.0.1
+ 0L, // f1: 0.0.0.0
+ 4294967295L, // f2: 255.255.255.255
+ 3232235777L, // f3: 192.168.1.1
+ 167772161L, // f4: 10.0.0.1
+ null, // f5: null
+ -1L, // f6: negative (invalid)
+ 4294967296L // f7: exceeds max (invalid)
+ )
+ .andDataTypes(
+ BIGINT(), BIGINT(), BIGINT(), BIGINT(), BIGINT(),
BIGINT(), BIGINT(),
+ BIGINT())
+ .testResult($("f0").inetNtoa(), "INET_NTOA(f0)", "127.0.0.1",
STRING())
+ .testResult($("f1").inetNtoa(), "INET_NTOA(f1)", "0.0.0.0",
STRING())
+ .testResult($("f2").inetNtoa(), "INET_NTOA(f2)",
"255.255.255.255", STRING())
+ .testResult($("f3").inetNtoa(), "INET_NTOA(f3)",
"192.168.1.1", STRING())
+ .testResult($("f4").inetNtoa(), "INET_NTOA(f4)", "10.0.0.1",
STRING())
+ .testResult($("f5").inetNtoa(), "INET_NTOA(f5)", null,
STRING().nullable())
+ .testResult($("f6").inetNtoa(), "INET_NTOA(f6)", null,
STRING().nullable())
+ .testResult($("f7").inetNtoa(), "INET_NTOA(f7)", null,
STRING().nullable());
+ }
+
+ /** Test INET_NTOA with INT inputs. */
+ private TestSetSpec inetNtoaIntInputTestCases() {
+ return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_NTOA)
+ .onFieldsWithData(
+ 2130706433, // f0: 127.0.0.1 (as INT)
+ 0, // f1: 0.0.0.0 (as INT)
+ 167772161, // f2: 10.0.0.1 (as INT)
+ (Integer) null, // f3: null (as INT)
+ -1 // f4: negative -> null (consistent with MySQL)
+ )
+ .andDataTypes(INT(), INT(), INT(), INT(), INT())
+ .testResult($("f0").inetNtoa(), "INET_NTOA(f0)", "127.0.0.1",
STRING())
+ .testResult($("f1").inetNtoa(), "INET_NTOA(f1)", "0.0.0.0",
STRING())
+ .testResult($("f2").inetNtoa(), "INET_NTOA(f2)", "10.0.0.1",
STRING())
+ .testResult($("f3").inetNtoa(), "INET_NTOA(f3)", null,
STRING().nullable())
+ .testResult($("f4").inetNtoa(), "INET_NTOA(f4)", null,
STRING().nullable());
+ }
+
+ /** Test INET_NTOA with SMALLINT inputs. */
+ private TestSetSpec inetNtoaSmallintInputTestCases() {
+ return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_NTOA)
+ .onFieldsWithData(
+ (short) 256, // f0: 0.0.1.0
+ (short) 0, // f1: 0.0.0.0
+ (short) 1, // f2: 0.0.0.1
+ (Short) null, // f3: null
+ (short) -1 // f4: negative -> null (consistent with
MySQL)
+ )
+ .andDataTypes(SMALLINT(), SMALLINT(), SMALLINT(), SMALLINT(),
SMALLINT())
+ .testResult($("f0").inetNtoa(), "INET_NTOA(f0)", "0.0.1.0",
STRING())
+ .testResult($("f1").inetNtoa(), "INET_NTOA(f1)", "0.0.0.0",
STRING())
+ .testResult($("f2").inetNtoa(), "INET_NTOA(f2)", "0.0.0.1",
STRING())
+ .testResult($("f3").inetNtoa(), "INET_NTOA(f3)", null,
STRING().nullable())
+ .testResult($("f4").inetNtoa(), "INET_NTOA(f4)", null,
STRING().nullable());
+ }
+
+ /** Test INET_NTOA with TINYINT inputs. */
+ private TestSetSpec inetNtoaTinyintInputTestCases() {
+ return TestSetSpec.forFunction(BuiltInFunctionDefinitions.INET_NTOA)
+ .onFieldsWithData(
+ (byte) 1, // f0: 0.0.0.1
+ (byte) 0, // f1: 0.0.0.0
+ (byte) 127, // f2: 0.0.0.127
+ (Byte) null, // f3: null
+ (byte) -1 // f4: negative -> null (consistent with
MySQL)
+ )
+ .andDataTypes(TINYINT(), TINYINT(), TINYINT(), TINYINT(),
TINYINT())
+ .testResult($("f0").inetNtoa(), "INET_NTOA(f0)", "0.0.0.1",
STRING())
+ .testResult($("f1").inetNtoa(), "INET_NTOA(f1)", "0.0.0.0",
STRING())
+ .testResult($("f2").inetNtoa(), "INET_NTOA(f2)", "0.0.0.127",
STRING())
+ .testResult($("f3").inetNtoa(), "INET_NTOA(f3)", null,
STRING().nullable())
+ .testResult($("f4").inetNtoa(), "INET_NTOA(f4)", null,
STRING().nullable());
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetAtonFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetAtonFunction.java
new file mode 100644
index 00000000000..b75c48925ba
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetAtonFunction.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#INET_ATON}.
+ *
+ * <p>This function converts an IPv4 address string to its numeric
representation. It is partially
+ * compatible with MySQL INET_ATON, including support for short-form IPv4
addresses.
+ *
+ * <p>The conversion formula for a standard IP address A.B.C.D is: A * 256^3 +
B * 256^2 + C * 256 +
+ * D
+ *
+ * <p>MySQL-compatible short-form IPv4 addresses are supported:
+ *
+ * <ul>
+ * <li>a — the value is stored directly as an address (value must be in [0,
255])
+ * <li>a.b — interpreted as a.0.0.b
+ * <li>a.b.c — interpreted as a.b.0.c
+ * <li>a.b.c.d — standard dotted-decimal format
+ * </ul>
+ *
+ * <p>Leading zeros in octets are parsed as decimal (consistent with MySQL),
not octal.
+ *
+ * <p>Note: This function only supports IPv4 addresses. IPv6 addresses are not
supported.
+ *
+ * <p>Examples:
+ *
+ * <ul>
+ * <li>INET_ATON('1') returns 1 (single number)
+ * <li>INET_ATON('127.0.0.1') returns 2130706433
+ * <li>INET_ATON('127.1') returns 2130706433 (short-form: 127.0.0.1)
+ * <li>INET_ATON('127.0.1') returns 2130706433 (short-form: 127.0.0.1)
+ * <li>INET_ATON('10.0.0.1') returns 167772161
+ * <li>INET_ATON('0.0.0.0') returns 0
+ * </ul>
+ */
+@Internal
+public class InetAtonFunction extends BuiltInScalarFunction {
+
+ public InetAtonFunction(SpecializedFunction.SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.INET_ATON, context);
+ }
+
+ /**
+ * Converts an IPv4 address string to its numeric representation.
+ *
+ * @param ipAddress the IPv4 address string in dotted-decimal notation
(supports short-form)
+ * @return the numeric representation of the IP address, or null if input
is null or invalid
+ */
+ public @Nullable Long eval(@Nullable StringData ipAddress) {
+ if (ipAddress == null) {
+ return null;
+ }
+
+ BinaryStringData binaryStr = (BinaryStringData) ipAddress;
+ if (BinaryStringDataUtil.isEmpty(binaryStr)) {
+ return null;
+ }
+
+ return ipToLong(binaryStr);
+ }
+
+ /**
+ * Converts an IPv4 address string to a long value.
+ *
+ * <p>Operates directly on UTF-8 bytes from {@link BinaryStringData} to
avoid unnecessary String
+ * object allocation. Since IPv4 addresses contain only ASCII characters
('0'-'9' and '.'), each
+ * character is exactly one byte in UTF-8 encoding.
+ *
+ * <p>Supports MySQL-compatible short-form addresses:
+ *
+ * <ul>
+ * <li>a — direct value (value must be in [0, 255])
+ * <li>a.b — a.0.0.b (each part must be in [0, 255])
+ * <li>a.b.c — a.b.0.c (each part must be in [0, 255])
+ * <li>a.b.c.d — standard format (each part must be in [0, 255])
+ * </ul>
+ *
+ * <p>Leading zeros are treated as decimal (not octal), consistent with
MySQL behavior.
+ *
+ * @param ip the IPv4 address as BinaryStringData
+ * @return the long value, or null if the IP address is invalid
+ */
+ private static @Nullable Long ipToLong(BinaryStringData ip) {
+ int len = ip.getSizeInBytes();
+ int partCount = 0;
+ int partStart = 0;
+ long[] parts = new long[4];
+
+ for (int i = 0; i <= len; i++) {
+ if (i == len || ip.byteAt(i) == '.') {
+ if (partCount >= 4) {
+ return null;
+ }
+ if (partStart == i) {
+ return null;
+ }
+
+ // Parse number manually from bytes
+ long value = 0;
+ for (int j = partStart; j < i; j++) {
+ byte b = ip.byteAt(j);
+ if (b < '0' || b > '9') {
+ return null;
+ }
+ value = value * 10 + (b - '0');
+ if (value > 255) {
+ return null;
+ }
+ }
+ parts[partCount++] = value;
+ partStart = i + 1;
+ }
+ }
+
+ switch (partCount) {
+ case 1:
+ // Single number: direct value
+ return parts[0];
+ case 2:
+ // a.b -> a.0.0.b
+ return (parts[0] << 24) | parts[1];
+ case 3:
+ // a.b.c -> a.b.0.c
+ return (parts[0] << 24) | (parts[1] << 16) | parts[2];
+ case 4:
+ // a.b.c.d -> standard format
+ return (parts[0] << 24) | (parts[1] << 16) | (parts[2] << 8) |
parts[3];
+ default:
+ return null;
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetNtoaFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetNtoaFunction.java
new file mode 100644
index 00000000000..6e2333072a2
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetNtoaFunction.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#INET_NTOA}.
+ *
+ * <p>This function converts a numeric IPv4 address representation back to its
string format.
+ *
+ * <p>The conversion extracts each octet from the numeric value using bit
shifting and masking.
+ *
+ * <p>Accepts any integer numeric type (TINYINT, SMALLINT, INT, BIGINT).
Negative values return
+ * null, consistent with MySQL's {@code INET_NTOA(-1) = NULL} behavior.
+ *
+ * <p>Note: This function only supports IPv4 addresses. IPv6 addresses are not
supported.
+ *
+ * <p>Examples:
+ *
+ * <ul>
+ * <li>INET_NTOA(2130706433) returns '127.0.0.1'
+ * <li>INET_NTOA(167772161) returns '10.0.0.1'
+ * <li>INET_NTOA(0) returns '0.0.0.0'
+ * <li>INET_NTOA(-1) returns NULL
+ * </ul>
+ */
+@Internal
+public class InetNtoaFunction extends BuiltInScalarFunction {
+
+ public InetNtoaFunction(SpecializedFunction.SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.INET_NTOA, context);
+ }
+
+ /**
+ * Converts a numeric IPv4 address representation to its string format.
+ *
+ * <p>Accepts any integer numeric type (TINYINT, SMALLINT, INT, BIGINT).
Negative values return
+ * null, consistent with MySQL's {@code INET_NTOA(-1) = NULL} behavior.
+ *
+ * @param ipNumber the numeric representation of the IPv4 address
+ * @return the IPv4 address string in dotted-decimal notation, or null if
input is null,
+ * negative, or out of valid range [0, 4294967295]
+ */
+ public @Nullable StringData eval(@Nullable Number ipNumber) {
+ if (ipNumber == null) {
+ return null;
+ }
+ return longToIp(ipNumber.longValue());
+ }
+
+ /**
+ * Converts a numeric IPv4 address to its dotted-decimal string format.
+ *
+ * @param ipNumber the numeric representation
+ * @return the IPv4 address string, or null if out of valid range [0,
4294967295]
+ */
+ private static @Nullable StringData longToIp(long ipNumber) {
+ if (ipNumber < 0 || ipNumber > 0xFFFFFFFFL) {
+ return null;
+ }
+
+ return StringData.fromString(
+ ((ipNumber >> 24) & 0xFF)
+ + "."
+ + ((ipNumber >> 16) & 0xFF)
+ + "."
+ + ((ipNumber >> 8) & 0xFF)
+ + "."
+ + (ipNumber & 0xFF));
+ }
+}