(flink) branch release-1.18 updated: [FLINK-25565][Formats][Parquet] write and read parquet int64 timestamp (#18304) (#23887)

2023-12-08 Thread thw
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 4379f2c39fb [FLINK-25565][Formats][Parquet] write and read parquet 
int64 timestamp (#18304) (#23887)
4379f2c39fb is described below

commit 4379f2c39fbdaeb78f874f6dd461d20b8a8961cd
Author: Thomas Weise 
AuthorDate: Fri Dec 8 13:49:04 2023 -0500

[FLINK-25565][Formats][Parquet] write and read parquet int64 timestamp 
(#18304) (#23887)

Co-authored-by: Bo Cui 
---
 .../docs/connectors/table/formats/parquet.md   |  14 ++
 .../docs/connectors/table/formats/parquet.md   |  14 ++
 .../formats/parquet/ParquetFileFormatFactory.java  |  15 ++
 .../parquet/ParquetVectorizedInputFormat.java  |  13 +-
 .../formats/parquet/row/ParquetRowDataBuilder.java |  13 +-
 .../formats/parquet/row/ParquetRowDataWriter.java  |  67 -
 .../parquet/utils/ParquetSchemaConverter.java  |  50 +++-
 .../formats/parquet/vector/ParquetDictionary.java  |  20 +-
 .../parquet/vector/ParquetSplitReaderUtil.java |   3 +-
 .../vector/reader/AbstractColumnReader.java|   2 +-
 .../vector/reader/TimestampColumnReader.java   | 101 +++-
 .../formats/parquet/ParquetTimestampITCase.java| 280 +
 .../parquet/row/ParquetRowDataWriterTest.java  |  14 ++
 .../vector/ParquetInt64TimestampReaderTest.java|  69 +
 .../runtime/stream/FsStreamingSinkITCaseBase.scala |  65 +++--
 15 files changed, 683 insertions(+), 57 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/formats/parquet.md 
b/docs/content.zh/docs/connectors/table/formats/parquet.md
index 92536d00f98..295b11ef41b 100644
--- a/docs/content.zh/docs/connectors/table/formats/parquet.md
+++ b/docs/content.zh/docs/connectors/table/formats/parquet.md
@@ -84,6 +84,20 @@ Format 参数
   Boolean
   使用 UTC 时区或本地时区在纪元时间和 LocalDateTime 之间进行转换。Hive 0.x/1.x/2.x 使用本地时区,但 
Hive 3.x 使用 UTC 时区。
 
+
+  timestamp.time.unit
+  optional
+  micros
+  String
+  根据TimeUnit在Timestamp和int64之间进行转换,可选值nanos/micros/millis。
+
+
+  write.int64.timestamp
+  optional
+  false
+  Boolean
+  以int64替代int96存储parquet Timestamp。 注意:Timestamp将于时区无关(从不转换为不同时区)。
+
 
 
 
diff --git a/docs/content/docs/connectors/table/formats/parquet.md 
b/docs/content/docs/connectors/table/formats/parquet.md
index 0b7ba9c42f5..75c524f238f 100644
--- a/docs/content/docs/connectors/table/formats/parquet.md
+++ b/docs/content/docs/connectors/table/formats/parquet.md
@@ -84,6 +84,20 @@ Format Options
   Boolean
   Use UTC timezone or local timezone to the conversion between epoch 
time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use 
UTC timezone.
 
+
+  timestamp.time.unit
+  optional
+  micros
+  String
+  Store parquet int64/LogicalTypes timestamps in this time unit, value 
is nanos/micros/millis.
+
+
+  write.int64.timestamp
+  optional
+  false
+  Boolean
+  Write parquet timestamp as int64/LogicalTypes instead of 
int96/OriginalTypes. Note: Timestamp will be time zone agnostic (NEVER 
converted to a different time zone).
+
 
 
 
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
index 14f257899c1..8be727c7947 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
@@ -68,6 +68,21 @@ public class ParquetFileFormatFactory implements 
BulkReaderFormatFactory, BulkWr
 + " time and LocalDateTime. Hive 
0.x/1.x/2.x use local timezone. But Hive 3.x"
 + " use UTC timezone");
 
+public static final ConfigOption TIMESTAMP_TIME_UNIT =
+key("timestamp.time.unit")
+.stringType()
+.defaultValue("micros")
+.withDescription(
+"Store parquet int64/LogicalTypes timestamps in 
this time unit, value is nanos/micros/millis");
+
+public static final ConfigOption WRITE_INT64_TIMESTAMP =
+key("write.int64.timestamp")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Write parquet timestamp as int64/LogicalTypes 
instead of int96/OriginalTypes. "
++ "Note: Timestamp will be time zone 
agnostic (NEVER converted to a different time zone).");
+
 @Override
 public BulkDecodingFormat 

(flink) 02/02: [FLINK-33777] Fix ParquetTimestampITCase>FsStreamingSinkITCaseBase failing

2023-12-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e515dce78832dbbbf5fce9c8cdd113bbb62cdf0
Author: Sergey Nuyanzin 
AuthorDate: Fri Dec 8 08:51:38 2023 +0100

[FLINK-33777] Fix ParquetTimestampITCase>FsStreamingSinkITCaseBase failing
---
 .../apache/flink/formats/parquet/ParquetTimestampITCase.java |  4 ++--
 .../planner/runtime/stream/FsStreamingSinkITCaseBase.scala   | 12 +---
 2 files changed, 3 insertions(+), 13 deletions(-)

diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTimestampITCase.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTimestampITCase.java
index ed267d329d0..90c130e22f5 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTimestampITCase.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTimestampITCase.java
@@ -181,7 +181,7 @@ public class ParquetTimestampITCase extends 
FsStreamingSinkITCaseBase {
 Types.STRING,
 Types.STRING
 },
-new String[] {"a", "b", "c", "d", 
"e"})));
+new String[] {"f0", "f1", "f2", "f3", 
"f4"})));
 }
 
 @Override
@@ -198,7 +198,7 @@ public class ParquetTimestampITCase extends 
FsStreamingSinkITCaseBase {
 Types.STRING,
 Types.STRING
 },
-new String[] {"a", "b", "c", "d", 
"e"})));
+new String[] {"f0", "f1", "f2", "f3", 
"f4"})));
 }
 
 @Override
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
index c4f3c2c1764..d772d8ace77 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
@@ -25,8 +25,6 @@ import org.apache.flink.streaming.api.CheckpointingMode
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{DataTypes, Schema}
-import org.apache.flink.table.api.Expressions.$
 import org.apache.flink.table.data.TimestampData
 import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, 
TestSinkUtil}
 import org.apache.flink.testutils.junit.utils.TempDirUtils
@@ -176,15 +174,7 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
 
 tEnv.createTemporaryView(
   "my_table",
-  dataStream,
-  Schema
-.newBuilder()
-.column("f0", DataTypes.INT())
-.column("f1", DataTypes.STRING())
-.column("f2", DataTypes.STRING())
-.column("f3", DataTypes.STRING())
-.column("f4", DataTypes.STRING())
-.build()
+  dataStream
 )
 
 val ddl: String = getDDL(



(flink) branch master updated (fcc7bc2e5e5 -> 0e515dce788)

2023-12-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from fcc7bc2e5e5 [FLINK-33523][table-planner] Revert [FLINK-31835] Fix the 
array type that can't be converted from the external primitive array"
 new ca72f6302bd [FLINK-33480] Fix GroupAggregate restore tests
 new 0e515dce788 [FLINK-33777] Fix 
ParquetTimestampITCase>FsStreamingSinkITCaseBase failing

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/formats/parquet/ParquetTimestampITCase.java |  4 ++--
 .../plan/group-aggregate-distinct-mini-batch.json|  4 ++--
 .../plan/group-aggregate-distinct.json   |  4 ++--
 .../plan/group-aggregate-simple-mini-batch.json  |  4 ++--
 .../group-aggregate-simple/plan/group-aggregate-simple.json  |  4 ++--
 .../plan/group-aggregate-udf-with-merge-mini-batch.json  |  4 ++--
 .../plan/group-aggregate-udf-with-merge.json |  4 ++--
 .../plan/group-aggregate-udf-without-merge-mini-batch.json   |  4 ++--
 .../plan/group-aggregate-udf-without-merge.json  |  4 ++--
 .../planner/runtime/stream/FsStreamingSinkITCaseBase.scala   | 12 +---
 10 files changed, 19 insertions(+), 29 deletions(-)



(flink) 01/02: [FLINK-33480] Fix GroupAggregate restore tests

2023-12-08 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ca72f6302bd3d760d2b47cd8b1b8f2e48705117c
Author: bvarghese1 
AuthorDate: Thu Dec 7 12:40:01 2023 -0800

[FLINK-33480] Fix GroupAggregate restore tests

- This commit ca1c7ce48127472a7c7965099f8a7227549f09df has updated the logic
  to generate the primary key name while generating the compiled plan
- This commit updates/regenerates the plans to have updated primary key 
names
- Related commit: 193b1c68976cdfbd66147278f23d7d427d9b5562
---
 .../plan/group-aggregate-distinct-mini-batch.json | 4 ++--
 .../group-aggregate-distinct/plan/group-aggregate-distinct.json   | 4 ++--
 .../plan/group-aggregate-simple-mini-batch.json   | 4 ++--
 .../group-aggregate-simple/plan/group-aggregate-simple.json   | 4 ++--
 .../plan/group-aggregate-udf-with-merge-mini-batch.json   | 4 ++--
 .../plan/group-aggregate-udf-with-merge.json  | 4 ++--
 .../plan/group-aggregate-udf-without-merge-mini-batch.json| 4 ++--
 .../plan/group-aggregate-udf-without-merge.json   | 4 ++--
 8 files changed, 16 insertions(+), 16 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct-mini-batch/plan/group-aggregate-distinct-mini-batch.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct-mini-batch/plan/group-aggregate-distinct-mini-batch.json
index 8dceb51f726..f52bc4055c5 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct-mini-batch/plan/group-aggregate-distinct-mini-batch.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct-mini-batch/plan/group-aggregate-distinct-mini-batch.json
@@ -547,7 +547,7 @@
 } ],
 "watermarkSpecs" : [ ],
 "primaryKey" : {
-  "name" : "PK_132",
+  "name" : "PK_e",
   "type" : "PRIMARY_KEY",
   "columns" : [ "e" ]
 }
@@ -618,4 +618,4 @@
 },
 "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct/plan/group-aggregate-distinct.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct/plan/group-aggregate-distinct.json
index b1801290fb3..eb0f5e19536 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct/plan/group-aggregate-distinct.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-distinct/plan/group-aggregate-distinct.json
@@ -299,7 +299,7 @@
 } ],
 "watermarkSpecs" : [ ],
 "primaryKey" : {
-  "name" : "PK_132",
+  "name" : "PK_e",
   "type" : "PRIMARY_KEY",
   "columns" : [ "e" ]
 }
@@ -356,4 +356,4 @@
 },
 "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-simple-mini-batch/plan/group-aggregate-simple-mini-batch.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-simple-mini-batch/plan/group-aggregate-simple-mini-batch.json
index 6ada7bb67e8..5e56eb25708 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-simple-mini-batch/plan/group-aggregate-simple-mini-batch.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-simple-mini-batch/plan/group-aggregate-simple-mini-batch.json
@@ -277,7 +277,7 @@
 } ],
 "watermarkSpecs" : [ ],
 "primaryKey" : {
-  "name" : "PK_129",
+  "name" : "PK_b",
   "type" : "PRIMARY_KEY",
   "columns" : [ "b" ]
 }
@@ -348,4 +348,4 @@
 },
 "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-simple/plan/group-aggregate-simple.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/group-aggregate-simple/plan/group-aggregate-simple.json
index 

(flink-connector-shared-utils) branch ci_utils updated: [FLINK-33776] Allow to specify optional profile for connectors

2023-12-08 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch ci_utils
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-shared-utils.git


The following commit(s) were added to refs/heads/ci_utils by this push:
 new 11be5c6  [FLINK-33776] Allow to specify optional profile for connectors
11be5c6 is described below

commit 11be5c693081aba252ab39576ec1c6f7fde5765c
Author: Sergey Nuyanzin 
AuthorDate: Fri Dec 8 13:17:30 2023 +0100

[FLINK-33776] Allow to specify optional profile for connectors
---
 .github/workflows/_testing.yml | 7 +++
 .github/workflows/ci.yml   | 8 
 2 files changed, 15 insertions(+)

diff --git a/.github/workflows/_testing.yml b/.github/workflows/_testing.yml
index cbb9eaa..aca4d74 100644
--- a/.github/workflows/_testing.yml
+++ b/.github/workflows/_testing.yml
@@ -45,6 +45,13 @@ jobs:
   flink_version: 1.16.1
   connector_branch: ci_utils
   run_dependency_convergence: false
+  optional_maven_profile:
+uses: ./.github/workflows/ci.yml
+with:
+  flink_version: 1.16-SNAPSHOT
+  connector_branch: ci_utils
+  jdk_version: 11
+  optional_maven_profiles: "java11,java11-target"
   multiple-branches:
 strategy:
   matrix:
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index cfd9752..e9d1fcc 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -55,6 +55,10 @@ on:
 description: "Branch that need to be checked out"
 required: false
 type: string
+  optional_maven_profiles:
+description: "Optional maven profiles."
+required: false
+type: string
 
 jobs:
   compile_and_test:
@@ -93,6 +97,10 @@ jobs:
 if: ${{ inputs.run_dependency_convergence }}
 run: echo 
"MVN_DEPENDENCY_CONVERGENCE=-Dflink.convergence.phase=install 
-Pcheck-convergence" >> $GITHUB_ENV
 
+  - name: "Enable optional maven profiles"
+if: ${{ inputs.optional_maven_profiles }}
+run: echo "MVN_COMMON_OPTIONS=${MVN_COMMON_OPTIONS} -P ${{ 
inputs.optional_maven_profiles }}" >> $GITHUB_ENV
+
   - name: "Determine Flink binary url"
 run: |
   binary_url=${{ inputs.flink_url }}



(flink-connector-shared-utils) branch ci_utils updated: [FLINK-33302] Allow to customize jdk version for connectors

2023-12-08 Thread snuyanzin
This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch ci_utils
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-shared-utils.git


The following commit(s) were added to refs/heads/ci_utils by this push:
 new 835ba96  [FLINK-33302] Allow to customize jdk version for connectors
835ba96 is described below

commit 835ba9613a0f488172cf71a8fa4dfcd2ded529a5
Author: Sergey Nuyanzin 
AuthorDate: Fri Dec 8 11:21:40 2023 +0100

[FLINK-33302] Allow to customize jdk version for connectors
---
 .github/workflows/_testing.yml | 7 +++
 .github/workflows/ci.yml   | 7 ++-
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/_testing.yml b/.github/workflows/_testing.yml
index ba3bfd3..cbb9eaa 100644
--- a/.github/workflows/_testing.yml
+++ b/.github/workflows/_testing.yml
@@ -32,6 +32,13 @@ jobs:
 with:
   flink_version: 1.16-SNAPSHOT
   connector_branch: ci_utils
+  flink118-java17-version:
+uses: ./.github/workflows/ci.yml
+with:
+  flink_version: 1.18.0
+  jdk_version: 17
+  connector_branch: ci_utils
+  run_dependency_convergence: false
   disable-convergence:
 uses: ./.github/workflows/ci.yml
 with:
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 1f9a58a..cfd9752 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -27,6 +27,11 @@ on:
 description: "Flink version to test against."
 required: true
 type: string
+  jdk_version:
+description: "Jdk version to test against."
+required: false
+default: 8, 11
+type: string
   cache_flink_binary:
 description: "Whether to cache the Flink binary. If not set this 
parameter is inferred from the version parameter. Must be set if 'flink_url' is 
used."
 required: false
@@ -56,7 +61,7 @@ jobs:
 runs-on: ubuntu-latest
 strategy:
   matrix:
-jdk: [8, 11]
+jdk: ${{ fromJSON(format('[{0}]', inputs.jdk_version)) }}
 timeout-minutes: ${{ inputs.timeout_global }}
 env:
   MVN_COMMON_OPTIONS: -U -B --no-transfer-progress -Dflink.version=${{ 
inputs.flink_version }}



(flink) branch release-1.18 updated: [FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle binary literal

2023-12-08 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 265d8174165 [FLINK-33313][table] Fix 
RexNodeExtractor#extractConjunctiveConditions throws an Exception when handle 
binary literal
265d8174165 is described below

commit 265d81741654ff70836485e8e8d31ce33a87e960
Author: zoudan 
AuthorDate: Thu Oct 19 17:48:05 2023 +0800

[FLINK-33313][table] Fix RexNodeExtractor#extractConjunctiveConditions 
throws an Exception when handle binary literal

Close apache/flink#23551
---
 .../table/planner/plan/utils/RexNodeExtractor.scala |  4 
 .../plan/utils/NestedProjectionUtilTest.scala   | 10 ++
 .../planner/plan/utils/RexNodeExtractorTest.scala   | 21 +
 .../planner/plan/utils/RexNodeRewriterTest.scala|  7 ---
 .../table/planner/plan/utils/RexNodeTestBase.scala  |  6 --
 5 files changed, 39 insertions(+), 9 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
index a7cbb4a9ffc..4b57796e792 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
@@ -36,6 +36,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.YearMonthIntervalType
 import org.apache.flink.util.Preconditions
 
+import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlPostfixOperator}
@@ -489,6 +490,9 @@ class RexNodeToExpressionConverter(
 // convert to BigDecimal
 literal.getValueAs(classOf[java.math.BigDecimal])
 
+  case BINARY | VARBINARY =>
+literal.getValueAs(classOf[Array[Byte]])
+
   case _ =>
 literal.getValue
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
index ec8214f5b91..9cd44c9fea7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/NestedProjectionUtilTest.scala
@@ -87,12 +87,13 @@ class NestedProjectionUtilTest extends RexNodeTestBase {
   "$2",
   "$3",
   "$4",
+  "$5",
   "*($t2, $t3)",
   "100",
-  "<($t5, $t6)",
+  "<($t6, $t7)",
   "6",
-  ">($t1, $t8)",
-  "AND($t7, $t9)")))
+  ">($t1, $t9)",
+  "AND($t8, $t10)")))
 
 val nestedField = NestedProjectionUtil.build(exprs, 
rexProgram.getInputRowType)
 val paths = NestedProjectionUtil.convertToIndexArray(nestedField)
@@ -101,7 +102,8 @@ class NestedProjectionUtilTest extends RexNodeTestBase {
   Array(1),
   Array(2),
   Array(3),
-  Array(4)
+  Array(4),
+  Array(5)
 )
 assertArray(paths, orderedPaths)
 val builder = new FlinkRexBuilder(typeFactory)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 2eb87e35cc8..bd5f15d3bed 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction
 import org.apache.flink.table.resource.ResourceManager
 import org.apache.flink.table.utils.CatalogManagerMocks
 
+import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
@@ -145,6 +146,26 @@ class RexNodeExtractorTest extends RexNodeTestBase {
 assertEquals(0, unconvertedRexNodes.length)
   }
 
+  @Test
+  def testExtractConditionWithBinaryLiteral(): Unit = {
+// blob
+val t0 = rexBuilder.makeInputRef(allFieldTypes.get(5), 5)
+
+// X'616263'
+val t1 = rexBuilder.makeBinaryLiteral(ByteString.of("616263", 16))
+
+// blob = X'616263'
+val 

(flink) branch release-1.18 updated: Revert "[FLINK-31835][table-planner] Fix the array type that can't be converted from the external primitive array"

2023-12-08 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 674d3a74641 Revert "[FLINK-31835][table-planner] Fix the array type 
that can't be converted from the external primitive array"
674d3a74641 is described below

commit 674d3a746417ceb36f74ccebb6a713549af59033
Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com>
AuthorDate: Thu Dec 7 16:43:23 2023 +0100

Revert "[FLINK-31835][table-planner] Fix the array type that can't be 
converted from the external primitive array"

This reverts commit a6adbdda0cdf90635f0cd7a3427486bced301fbd.
---
 .../flink/table/types/CollectionDataType.java  | 24 +
 .../org/apache/flink/table/types/DataTypeTest.java | 14 -
 .../planner/functions/CastFunctionITCase.java  |  2 +-
 .../planner/runtime/stream/sql/FunctionITCase.java | 60 --
 .../planner/runtime/stream/table/ValuesITCase.java | 18 +--
 .../table/data/DataStructureConvertersTest.java| 24 +
 .../flink/table/test/TableAssertionTest.java   |  2 +-
 7 files changed, 7 insertions(+), 137 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
index 9188530a4ff..239e36eb201 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
@@ -26,8 +26,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.commons.lang3.ClassUtils;
-
 import javax.annotation.Nullable;
 
 import java.lang.reflect.Array;
@@ -120,27 +118,9 @@ public final class CollectionDataType extends DataType {
 // arrays are a special case because their default conversion class 
depends on the
 // conversion class of the element type
 if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz == 
null) {
-Class conversionClass =
-wrapOrUnWrap(
-elementDataType.getConversionClass(),
-elementDataType.getLogicalType().isNullable());
-
-return Array.newInstance(conversionClass, 0).getClass();
-}
-return wrapOrUnWrap(clazz, 
elementDataType.getLogicalType().isNullable());
-}
-
-private static Class wrapOrUnWrap(@Nullable Class source, boolean 
nullable) {
-if (source == null) {
-return null;
-}
-if (nullable) {
-return source.isPrimitive() ? 
ClassUtils.primitiveToWrapper(source) : source;
-} else {
-return ClassUtils.isPrimitiveWrapper(source)
-? ClassUtils.wrapperToPrimitive(source)
-: source;
+return Array.newInstance(elementDataType.getConversionClass(), 
0).getClass();
 }
+return clazz;
 }
 
 private DataType updateInnerDataType(DataType elementDataType) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
index ce9239266f2..a7d824009bf 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
@@ -24,8 +24,6 @@ import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.inference.TypeTransformations;
-import org.apache.flink.table.types.utils.DataTypeUtils;
 
 import org.junit.jupiter.api.Test;
 
@@ -222,16 +220,4 @@ class DataTypeTest {
 assertThat(DataType.getFields(ARRAY(INT(.isEmpty();
 assertThat(DataType.getFields(INT())).isEmpty();
 }
-
-@Test
-void testArrayConversionClass() {
-assertThat(DataTypes.ARRAY(INT())).hasConversionClass(Integer[].class);
-
assertThat(DataTypes.ARRAY(INT().notNull())).hasConversionClass(int[].class);
-DataType type = DataTypes.ARRAY(INT());
-assertThat(DataTypeUtils.transform(type, 
TypeTransformations.toNullable()))
-.hasConversionClass(Integer[].class);
-type = DataTypes.ARRAY(INT()).bridgedTo(int[].class);
-assertThat(DataTypeUtils.transform(type, 
TypeTransformations.toNullable()))
-.hasConversionClass(int[].class);
-

(flink) branch master updated: [FLINK-33523][table-planner] Revert [FLINK-31835] Fix the array type that can't be converted from the external primitive array"

2023-12-08 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new fcc7bc2e5e5 [FLINK-33523][table-planner] Revert [FLINK-31835] Fix the 
array type that can't be converted from the external primitive array"
fcc7bc2e5e5 is described below

commit fcc7bc2e5e529b135aee28d732b8c5a6769afdae
Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com>
AuthorDate: Thu Dec 7 15:16:37 2023 +0100

[FLINK-33523][table-planner] Revert [FLINK-31835] Fix the array type that 
can't be converted from the external primitive array"

This reverts commit a6adbdda
---
 .../flink/table/types/CollectionDataType.java  | 24 +
 .../org/apache/flink/table/types/DataTypeTest.java | 14 -
 .../planner/functions/CastFunctionITCase.java  |  2 +-
 .../planner/runtime/stream/sql/FunctionITCase.java | 60 --
 .../planner/runtime/stream/table/ValuesITCase.java | 18 +--
 .../table/data/DataStructureConvertersTest.java| 24 +
 .../flink/table/test/TableAssertionTest.java   |  2 +-
 7 files changed, 7 insertions(+), 137 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
index 9188530a4ff..239e36eb201 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
@@ -26,8 +26,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.commons.lang3.ClassUtils;
-
 import javax.annotation.Nullable;
 
 import java.lang.reflect.Array;
@@ -120,27 +118,9 @@ public final class CollectionDataType extends DataType {
 // arrays are a special case because their default conversion class 
depends on the
 // conversion class of the element type
 if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz == 
null) {
-Class conversionClass =
-wrapOrUnWrap(
-elementDataType.getConversionClass(),
-elementDataType.getLogicalType().isNullable());
-
-return Array.newInstance(conversionClass, 0).getClass();
-}
-return wrapOrUnWrap(clazz, 
elementDataType.getLogicalType().isNullable());
-}
-
-private static Class wrapOrUnWrap(@Nullable Class source, boolean 
nullable) {
-if (source == null) {
-return null;
-}
-if (nullable) {
-return source.isPrimitive() ? 
ClassUtils.primitiveToWrapper(source) : source;
-} else {
-return ClassUtils.isPrimitiveWrapper(source)
-? ClassUtils.wrapperToPrimitive(source)
-: source;
+return Array.newInstance(elementDataType.getConversionClass(), 
0).getClass();
 }
+return clazz;
 }
 
 private DataType updateInnerDataType(DataType elementDataType) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
index ce9239266f2..a7d824009bf 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
@@ -24,8 +24,6 @@ import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.inference.TypeTransformations;
-import org.apache.flink.table.types.utils.DataTypeUtils;
 
 import org.junit.jupiter.api.Test;
 
@@ -222,16 +220,4 @@ class DataTypeTest {
 assertThat(DataType.getFields(ARRAY(INT(.isEmpty();
 assertThat(DataType.getFields(INT())).isEmpty();
 }
-
-@Test
-void testArrayConversionClass() {
-assertThat(DataTypes.ARRAY(INT())).hasConversionClass(Integer[].class);
-
assertThat(DataTypes.ARRAY(INT().notNull())).hasConversionClass(int[].class);
-DataType type = DataTypes.ARRAY(INT());
-assertThat(DataTypeUtils.transform(type, 
TypeTransformations.toNullable()))
-.hasConversionClass(Integer[].class);
-type = DataTypes.ARRAY(INT()).bridgedTo(int[].class);
-assertThat(DataTypeUtils.transform(type, 
TypeTransformations.toNullable()))
-.hasConversionClass(int[].class);
-}
 }
diff --git