[jira] [Commented] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled

2024-01-08 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804621#comment-17804621
 ] 

Matthias Pohl commented on FLINK-34036:
---

It could be that it's a configuration issue in the GHA workflows. But I would 
assume that this would then also affect the 1.18 nightly workflow. 
Additionally, I haven't touched the hadoop workflow config around the time the 
issue started to appear. The only cause I could think of is a rebase onto a 
most-recent version of {{master}}.

> Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with 
> Hadoop 3.1.3 enabled
> --
>
> Key: FLINK-34036
> URL: https://issues.apache.org/jira/browse/FLINK-34036
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility, Connectors / Hive
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> The following {{HiveDialectQueryITCase}} tests fail consistently in the 
> FLINK-27075 GitHub Actions [master nightly 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of 
> Flink but not the [release-1.18 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]:
> * {{testInsertDirectory}}
> * {{testCastTimeStampToDecimal}}
> * {{testNullLiteralAsArgument}}
> {code}
> Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
> Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
>  -- Time elapsed: 0.069 s <<< ERROR!
> Jan 09 03:38:45 java.lang.NoSuchMethodError: 
> org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal
>  -- Time elapsed: 0.007 s <<< ERROR!
> Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with 
> identifier 'test-catalog.default.t1' does not exist.
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266)
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206)
> Jan 09 03:38:45   at 
> org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.663 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory 
> -- Time elapsed: 7.326 s <<< FAILURE!
> Jan 09 03:38:45 org.opentest4j.AssertionFailedError: 
> Jan 09 03:38:45 
> Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
> Jan 09 03:38:45  but was: "A:english=90math=100history=85"
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Jan 09 03:38:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}
> The most-recent build failures in GHA workflow failures are:
> * 
> https://github.com/XComp/flink/actions/runs/7455836411/job/20285758541#step:12:23332
> * 
> https://github.com/XComp/flink/actions/runs/7447254277/job/20259593089#step:12:23378
> * 
> https://github.com/XComp/flink/actions/runs/7442459819/job/20246101021#step:12:23332
> * 
> https://github.com/XComp/flink/actions/runs/7438111934/job/20236674470#step:12:23375
> * 
> 

Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]

2024-01-08 Thread via GitHub


xuyangzhong commented on code in PR #23752:
URL: https://github.com/apache/flink/pull/23752#discussion_r1445736898


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java:
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.hints.stream;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.utils.PlanKind;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import scala.Enumeration;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link org.apache.flink.table.planner.hint.StateTtlHint}. */
+class StateTtlHintTest extends TableTestBase {
+
+protected StreamTableTestUtil util;
+
+@BeforeEach
+void before() {
+util = streamTestUtil(TableConfig.getDefault());
+util.tableEnv()
+.executeSql(
+"CREATE TABLE T1 (\n"
++ "  a1 BIGINT,\n"
++ "  b1 VARCHAR\n"
++ ") WITH (\n"
++ " 'connector' = 'values'\n"
++ ")");
+util.tableEnv()
+.executeSql(
+"CREATE TABLE T2 (\n"
++ "  a2 BIGINT,\n"
++ "  b2 VARCHAR\n"
++ ") WITH (\n"
++ " 'connector' = 'values'\n"
++ ")");
+
+util.tableEnv()
+.executeSql(
+"CREATE TABLE T3 (\n"
++ "  a3 BIGINT,\n"
++ "  b3 VARCHAR\n"
++ ") WITH (\n"
++ " 'connector' = 'values'\n"
++ ")");
+
+util.tableEnv().executeSql("CREATE View V4 as select a3 as a4, b3 as 
b4 from T3");
+
+util.tableEnv()
+.executeSql("create view V5 as select T1.* from T1 join T2 on 
T1.a1 = T2.a2");
+}
+
+@Test
+void testSimpleJoinStateTtlHintWithEachSide() {
+String sql =
+"select /*+ STATE_TTL('T2' = '2d', 'T1' = '1d') */* from T1 
join T2 on T1.a1 = T2.a2";
+verify(sql);
+}
+
+@Test
+void testJoinStateTtlHintOnlyWithRightSide() {
+String sql = "select /*+ STATE_TTL('T2' = '2d') */* from T1 join T2 on 
T1.a1 = T2.a2";
+verify(sql);
+}
+
+@Test
+void testJoinStateTtlHintWithContinuousJoin() {
+String sql =
+"select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d', 'T1' = '1d') 
*/* from T1, T2, T3 where T1.a1 = T2.a2 and T2.b2 = T3.b3";
+verify(sql);
+}
+
+@Test
+void testJoinStateTtlHintWithMultiLevelJoin() {
+String sql =
+"select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d', 'T1' = '1d') 
*/* from T1 "
++ "join (select T2.* from T2 join T3 on T2.b2 = T3.b3) 
TMP on T1.a1 = TMP.b2";
+assertThatThrownBy(() -> verify(sql))
+.isInstanceOf(ValidationException.class)
+.hasMessageContaining(
+"The options of following hints cannot match the name 
of input tables or views: \n`%s` in `%s`",
+"T2, T3", "STATE_TTL");
+}
+
+@Test
+void testJoinStateTtlHintWithOneUnknownTable() {
+String sql =
+"select /*+ STATE_TTL('T5' = '2d', 'T1' = '1d') */* from T1 
join T2 on T1.a1 = T2.a2";
+
+assertThatThrownBy(() -> verify(sql))
+.isInstanceOf(ValidationException.class)
+.hasMessageContaining(
+"The options of following hints cannot match the name 
of input tables or views: \n`%s` in `%s`",
+   

[jira] [Commented] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled

2024-01-08 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804615#comment-17804615
 ] 

Matthias Pohl commented on FLINK-34036:
---

The error for {{testInsertDirectory}} looks like an encoding issue:
{code}
Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
Jan 09 03:38:45  but was: "A:english=90math=100history=85"
{code}

> Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with 
> Hadoop 3.1.3 enabled
> --
>
> Key: FLINK-34036
> URL: https://issues.apache.org/jira/browse/FLINK-34036
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility, Connectors / Hive
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> The following {{HiveDialectQueryITCase}} tests fail consistently in the 
> FLINK-27075 GitHub Actions [master nightly 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of 
> Flink but not the [release-1.18 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]:
> * {{testInsertDirectory}}
> * {{testCastTimeStampToDecimal}}
> * {{testNullLiteralAsArgument}}
> {code}
> Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
> Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
>  -- Time elapsed: 0.069 s <<< ERROR!
> Jan 09 03:38:45 java.lang.NoSuchMethodError: 
> org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal
>  -- Time elapsed: 0.007 s <<< ERROR!
> Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with 
> identifier 'test-catalog.default.t1' does not exist.
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266)
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206)
> Jan 09 03:38:45   at 
> org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.663 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory 
> -- Time elapsed: 7.326 s <<< FAILURE!
> Jan 09 03:38:45 org.opentest4j.AssertionFailedError: 
> Jan 09 03:38:45 
> Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
> Jan 09 03:38:45  but was: "A:english=90math=100history=85"
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Jan 09 03:38:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}
> The most-recent build failures in GHA workflow failures are:
> * 
> https://github.com/XComp/flink/actions/runs/7455836411/job/20285758541#step:12:23332
> * 
> https://github.com/XComp/flink/actions/runs/7447254277/job/20259593089#step:12:23378
> * 
> https://github.com/XComp/flink/actions/runs/7442459819/job/20246101021#step:12:23332
> * 
> https://github.com/XComp/flink/actions/runs/7438111934/job/20236674470#step:12:23375
> * 
> https://github.com/XComp/flink/actions/runs/7435499743/job/20231030744#step:12:23367
> Interestingly, the failure doesn't appear in the Azure 

[jira] [Commented] (FLINK-33414) MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot fails due to unexpected TimeoutException

2024-01-08 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804616#comment-17804616
 ] 

Sergey Nuyanzin commented on FLINK-33414:
-

I don't know it appeared only today during nightly run which is scheduled once 
per 24 hours, 
I suspect it is a consequence of merging the related PR yesterday

> MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot fails due to 
> unexpected TimeoutException
> ---
>
> Key: FLINK-33414
> URL: https://issues.apache.org/jira/browse/FLINK-33414
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Jiang Xin
>Priority: Critical
>  Labels: github-actions, pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> We see this test instability in [this 
> build|https://github.com/XComp/flink/actions/runs/6695266358/job/18192039035#step:12:9253].
> {code:java}
> Error: 17:04:52 17:04:52.042 [ERROR] Failures: 
> 9252Error: 17:04:52 17:04:52.042 [ERROR]   
> MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot:120 
> 9253Oct 30 17:04:52 Expecting a throwable with root cause being an instance 
> of:
> 9254Oct 30 17:04:52   
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException
> 9255Oct 30 17:04:52 but was an instance of:
> 9256Oct 30 17:04:52   java.util.concurrent.TimeoutException: Timeout has 
> occurred: 100 ms
> 9257Oct 30 17:04:52   at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
> 9258Oct 30 17:04:52   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 9259Oct 30 17:04:52   at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 9260Oct 30 17:04:52   ...(27 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed) {code}
> The same error occurred in the [finegrained_resourcemanager stage of this 
> build|https://github.com/XComp/flink/actions/runs/6468655160/job/17563927249#step:11:26516]
>  (as reported in FLINK-33245).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled

2024-01-08 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804617#comment-17804617
 ] 

Matthias Pohl commented on FLINK-34036:
---

{{testNullLiteralAsArgument}} sounds like a classpath issue:
{code}
Jan 09 03:38:45 java.lang.NoSuchMethodError: 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
{code}

> Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with 
> Hadoop 3.1.3 enabled
> --
>
> Key: FLINK-34036
> URL: https://issues.apache.org/jira/browse/FLINK-34036
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility, Connectors / Hive
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> The following {{HiveDialectQueryITCase}} tests fail consistently in the 
> FLINK-27075 GitHub Actions [master nightly 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of 
> Flink but not the [release-1.18 
> workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]:
> * {{testInsertDirectory}}
> * {{testCastTimeStampToDecimal}}
> * {{testNullLiteralAsArgument}}
> {code}
> Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
> Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
>  -- Time elapsed: 0.069 s <<< ERROR!
> Jan 09 03:38:45 java.lang.NoSuchMethodError: 
> org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal
>  -- Time elapsed: 0.007 s <<< ERROR!
> Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with 
> identifier 'test-catalog.default.t1' does not exist.
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266)
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206)
> Jan 09 03:38:45   at 
> org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.663 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory 
> -- Time elapsed: 7.326 s <<< FAILURE!
> Jan 09 03:38:45 org.opentest4j.AssertionFailedError: 
> Jan 09 03:38:45 
> Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
> Jan 09 03:38:45  but was: "A:english=90math=100history=85"
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Jan 09 03:38:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}
> The most-recent build failures in GHA workflow failures are:
> * 
> https://github.com/XComp/flink/actions/runs/7455836411/job/20285758541#step:12:23332
> * 
> https://github.com/XComp/flink/actions/runs/7447254277/job/20259593089#step:12:23378
> * 
> https://github.com/XComp/flink/actions/runs/7442459819/job/20246101021#step:12:23332
> * 
> https://github.com/XComp/flink/actions/runs/7438111934/job/20236674470#step:12:23375
> * 
> 

[jira] [Created] (FLINK-34037) FLIP-398: Improve Serialization Configuration And Usage In Flink

2024-01-08 Thread Fang Yong (Jira)
Fang Yong created FLINK-34037:
-

 Summary: FLIP-398: Improve Serialization Configuration And Usage 
In Flink
 Key: FLINK-34037
 URL: https://issues.apache.org/jira/browse/FLINK-34037
 Project: Flink
  Issue Type: Improvement
  Components: API / Type Serialization System, Runtime / Configuration
Affects Versions: 1.19.0
Reporter: Fang Yong


Improve serialization in 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-398%3A+Improve+Serialization+Configuration+And+Usage+In+Flink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled

2024-01-08 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-34036:
--
Description: 
The following {{HiveDialectQueryITCase}} tests fail consistently in the 
FLINK-27075 GitHub Actions [master nightly 
workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of 
Flink but not 
[release-1.18|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]:
* {{testInsertDirectory}}
* {{testCastTimeStampToDecimal}}
* {{testNullLiteralAsArgument}}
{code}
Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
org.apache.flink.connectors.hive.HiveDialectQueryITCase
Error: 03:38:45 03:38:45.662 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
 -- Time elapsed: 0.069 s <<< ERROR!
Jan 09 03:38:45 java.lang.NoSuchMethodError: 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
Jan 09 03:38:45 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959)
Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498)
Jan 09 03:38:45 
Error: 03:38:45 03:38:45.662 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal
 -- Time elapsed: 0.007 s <<< ERROR!
Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with 
identifier 'test-catalog.default.t1' does not exist.
Jan 09 03:38:45 at 
org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266)
Jan 09 03:38:45 at 
org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206)
Jan 09 03:38:45 at 
org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74)
Jan 09 03:38:45 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
Jan 09 03:38:45 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
Jan 09 03:38:45 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835)
Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498)
Jan 09 03:38:45 
Error: 03:38:45 03:38:45.663 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory -- 
Time elapsed: 7.326 s <<< FAILURE!
Jan 09 03:38:45 org.opentest4j.AssertionFailedError: 
Jan 09 03:38:45 
Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
Jan 09 03:38:45  but was: "A:english=90math=100history=85"
Jan 09 03:38:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Jan 09 03:38:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Jan 09 03:38:45 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
Jan 09 03:38:45 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498)
Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498)
{code}

The most-recent build failures in GHA workflow failures are:
* 
https://github.com/XComp/flink/actions/runs/7455836411/job/20285758541#step:12:23332
* 
https://github.com/XComp/flink/actions/runs/7447254277/job/20259593089#step:12:23378
* 
https://github.com/XComp/flink/actions/runs/7442459819/job/20246101021#step:12:23332
* 
https://github.com/XComp/flink/actions/runs/7438111934/job/20236674470#step:12:23375
* 
https://github.com/XComp/flink/actions/runs/7435499743/job/20231030744#step:12:23367
Interestingly, the failure doesn't appear in the Azure Pipelines nightlies.

  was:
The following {{HiveDialectQueryITCase}} tests fail consistently in the 
FLINK-27075 GitHub Actions nightly workflow of Flink on {{master}} (but not 
{{release-1.18}}!):
* {{testInsertDirectory}}
* {{testCastTimeStampToDecimal}}
* {{testNullLiteralAsArgument}}
{code}
Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
org.apache.flink.connectors.hive.HiveDialectQueryITCase
Error: 03:38:45 03:38:45.662 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
 -- Time elapsed: 0.069 s <<< ERROR!
Jan 09 03:38:45 java.lang.NoSuchMethodError: 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
Jan 09 03:38:45 at 

[jira] [Updated] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled

2024-01-08 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-34036:
--
Description: 
The following {{HiveDialectQueryITCase}} tests fail consistently in the 
FLINK-27075 GitHub Actions [master nightly 
workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of 
Flink but not the [release-1.18 
workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]:
* {{testInsertDirectory}}
* {{testCastTimeStampToDecimal}}
* {{testNullLiteralAsArgument}}
{code}
Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
org.apache.flink.connectors.hive.HiveDialectQueryITCase
Error: 03:38:45 03:38:45.662 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
 -- Time elapsed: 0.069 s <<< ERROR!
Jan 09 03:38:45 java.lang.NoSuchMethodError: 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
Jan 09 03:38:45 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959)
Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498)
Jan 09 03:38:45 
Error: 03:38:45 03:38:45.662 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal
 -- Time elapsed: 0.007 s <<< ERROR!
Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with 
identifier 'test-catalog.default.t1' does not exist.
Jan 09 03:38:45 at 
org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266)
Jan 09 03:38:45 at 
org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206)
Jan 09 03:38:45 at 
org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74)
Jan 09 03:38:45 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
Jan 09 03:38:45 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
Jan 09 03:38:45 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835)
Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498)
Jan 09 03:38:45 
Error: 03:38:45 03:38:45.663 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory -- 
Time elapsed: 7.326 s <<< FAILURE!
Jan 09 03:38:45 org.opentest4j.AssertionFailedError: 
Jan 09 03:38:45 
Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
Jan 09 03:38:45  but was: "A:english=90math=100history=85"
Jan 09 03:38:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Jan 09 03:38:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Jan 09 03:38:45 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
Jan 09 03:38:45 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498)
Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498)
{code}

The most-recent build failures in GHA workflow failures are:
* 
https://github.com/XComp/flink/actions/runs/7455836411/job/20285758541#step:12:23332
* 
https://github.com/XComp/flink/actions/runs/7447254277/job/20259593089#step:12:23378
* 
https://github.com/XComp/flink/actions/runs/7442459819/job/20246101021#step:12:23332
* 
https://github.com/XComp/flink/actions/runs/7438111934/job/20236674470#step:12:23375
* 
https://github.com/XComp/flink/actions/runs/7435499743/job/20231030744#step:12:23367

Interestingly, the failure doesn't appear in the Azure Pipelines nightlies.

  was:
The following {{HiveDialectQueryITCase}} tests fail consistently in the 
FLINK-27075 GitHub Actions [master nightly 
workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of 
Flink but not the [release-1.18 
workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]:
* {{testInsertDirectory}}
* {{testCastTimeStampToDecimal}}
* {{testNullLiteralAsArgument}}
{code}
Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
org.apache.flink.connectors.hive.HiveDialectQueryITCase
Error: 03:38:45 03:38:45.662 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
 -- Time elapsed: 0.069 s <<< ERROR!
Jan 09 03:38:45 java.lang.NoSuchMethodError: 

[jira] [Updated] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled

2024-01-08 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-34036:
--
Description: 
The following {{HiveDialectQueryITCase}} tests fail consistently in the 
FLINK-27075 GitHub Actions [master nightly 
workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of 
Flink but not the [release-1.18 
workflow|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]:
* {{testInsertDirectory}}
* {{testCastTimeStampToDecimal}}
* {{testNullLiteralAsArgument}}
{code}
Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
org.apache.flink.connectors.hive.HiveDialectQueryITCase
Error: 03:38:45 03:38:45.662 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
 -- Time elapsed: 0.069 s <<< ERROR!
Jan 09 03:38:45 java.lang.NoSuchMethodError: 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
Jan 09 03:38:45 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959)
Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498)
Jan 09 03:38:45 
Error: 03:38:45 03:38:45.662 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal
 -- Time elapsed: 0.007 s <<< ERROR!
Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with 
identifier 'test-catalog.default.t1' does not exist.
Jan 09 03:38:45 at 
org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266)
Jan 09 03:38:45 at 
org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206)
Jan 09 03:38:45 at 
org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74)
Jan 09 03:38:45 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
Jan 09 03:38:45 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
Jan 09 03:38:45 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835)
Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498)
Jan 09 03:38:45 
Error: 03:38:45 03:38:45.663 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory -- 
Time elapsed: 7.326 s <<< FAILURE!
Jan 09 03:38:45 org.opentest4j.AssertionFailedError: 
Jan 09 03:38:45 
Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
Jan 09 03:38:45  but was: "A:english=90math=100history=85"
Jan 09 03:38:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Jan 09 03:38:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Jan 09 03:38:45 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
Jan 09 03:38:45 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498)
Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498)
{code}

The most-recent build failures in GHA workflow failures are:
* 
https://github.com/XComp/flink/actions/runs/7455836411/job/20285758541#step:12:23332
* 
https://github.com/XComp/flink/actions/runs/7447254277/job/20259593089#step:12:23378
* 
https://github.com/XComp/flink/actions/runs/7442459819/job/20246101021#step:12:23332
* 
https://github.com/XComp/flink/actions/runs/7438111934/job/20236674470#step:12:23375
* 
https://github.com/XComp/flink/actions/runs/7435499743/job/20231030744#step:12:23367
Interestingly, the failure doesn't appear in the Azure Pipelines nightlies.

  was:
The following {{HiveDialectQueryITCase}} tests fail consistently in the 
FLINK-27075 GitHub Actions [master nightly 
workflow|https://github.com/XComp/flink/actions/workflows/nightly-dev.yml] of 
Flink but not 
[release-1.18|https://github.com/XComp/flink/actions/workflows/nightly-current.yml]:
* {{testInsertDirectory}}
* {{testCastTimeStampToDecimal}}
* {{testNullLiteralAsArgument}}
{code}
Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
org.apache.flink.connectors.hive.HiveDialectQueryITCase
Error: 03:38:45 03:38:45.662 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
 -- Time elapsed: 0.069 s <<< ERROR!
Jan 09 03:38:45 java.lang.NoSuchMethodError: 

Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]

2024-01-08 Thread via GitHub


xuyangzhong commented on code in PR #23752:
URL: https://github.com/apache/flink/pull/23752#discussion_r1445732928


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala:
##
@@ -116,13 +123,27 @@ class StreamPhysicalJoin(
   }
 
   override def translateToExecNode(): ExecNode[_] = {
+val stateTtlFromHint = new util.HashMap[JInt, JLong]
+getHints
+  .filter(hint => StateTtlHint.isStateTtlHint(hint.hintName))
+  .forEach {
+hint =>
+  hint.kvOptions.forEach(
+(input, ttl) =>
+  stateTtlFromHint
+.put(
+  if (input == FlinkHints.LEFT_INPUT) 0 else 1,

Review Comment:
   Actually, the value of the original state ttl hint written by user is also a 
standardized format (duration), otherwise an exception will be thrown directly. 
It seems that there is no need to uniformly convert it to milliseconds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled

2024-01-08 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804614#comment-17804614
 ] 

Matthias Pohl commented on FLINK-34036:
---

[~yuxia] Do you have an idea about the cause?

> Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with 
> Hadoop 3.1.3 enabled
> --
>
> Key: FLINK-34036
> URL: https://issues.apache.org/jira/browse/FLINK-34036
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility, Connectors / Hive
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> The following {{HiveDialectQueryITCase}} tests fail consistently in the 
> FLINK-27075 GitHub Actions nightly workflow of Flink on {{master}} (but not 
> {{release-1.18}}!):
> * {{testInsertDirectory}}
> * {{testCastTimeStampToDecimal}}
> * {{testNullLiteralAsArgument}}
> {code}
> Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
> Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
>  -- Time elapsed: 0.069 s <<< ERROR!
> Jan 09 03:38:45 java.lang.NoSuchMethodError: 
> org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.662 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal
>  -- Time elapsed: 0.007 s <<< ERROR!
> Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with 
> identifier 'test-catalog.default.t1' does not exist.
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266)
> Jan 09 03:38:45   at 
> org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206)
> Jan 09 03:38:45   at 
> org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
> Jan 09 03:38:45   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 09 03:38:45 
> Error: 03:38:45 03:38:45.663 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory 
> -- Time elapsed: 7.326 s <<< FAILURE!
> Jan 09 03:38:45 org.opentest4j.AssertionFailedError: 
> Jan 09 03:38:45 
> Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
> Jan 09 03:38:45  but was: "A:english=90math=100history=85"
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Jan 09 03:38:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Jan 09 03:38:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Jan 09 03:38:45   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498)
> Jan 09 03:38:45   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}
> The most-recent build failures in GHA workflow failures are:
> * 
> https://github.com/XComp/flink/actions/runs/7455836411/job/20285758541#step:12:23332
> * 
> https://github.com/XComp/flink/actions/runs/7447254277/job/20259593089#step:12:23378
> * 
> https://github.com/XComp/flink/actions/runs/7442459819/job/20246101021#step:12:23332
> * 
> https://github.com/XComp/flink/actions/runs/7438111934/job/20236674470#step:12:23375
> * 
> https://github.com/XComp/flink/actions/runs/7435499743/job/20231030744#step:12:23367
> Interestingly, the failure doesn't appear in the Azure Pipelines nightlies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34036) Various HiveDialectQueryITCase tests fail in GitHub Actions workflow with Hadoop 3.1.3 enabled

2024-01-08 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-34036:
-

 Summary: Various HiveDialectQueryITCase tests fail in GitHub 
Actions workflow with Hadoop 3.1.3 enabled
 Key: FLINK-34036
 URL: https://issues.apache.org/jira/browse/FLINK-34036
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hadoop Compatibility, Connectors / Hive
Affects Versions: 1.19.0
Reporter: Matthias Pohl


The following {{HiveDialectQueryITCase}} tests fail consistently in the 
FLINK-27075 GitHub Actions nightly workflow of Flink on {{master}} (but not 
{{release-1.18}}!):
* {{testInsertDirectory}}
* {{testCastTimeStampToDecimal}}
* {{testNullLiteralAsArgument}}
{code}
Error: 03:38:45 03:38:45.661 [ERROR] Tests run: 22, Failures: 1, Errors: 2, 
Skipped: 0, Time elapsed: 379.0 s <<< FAILURE! -- in 
org.apache.flink.connectors.hive.HiveDialectQueryITCase
Error: 03:38:45 03:38:45.662 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument
 -- Time elapsed: 0.069 s <<< ERROR!
Jan 09 03:38:45 java.lang.NoSuchMethodError: 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.getTimestamp(Ljava/lang/Object;Lorg/apache/hadoop/hive/serde2/objectinspector/PrimitiveObjectInspector;)Ljava/sql/Timestamp;
Jan 09 03:38:45 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testNullLiteralAsArgument(HiveDialectQueryITCase.java:959)
Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498)
Jan 09 03:38:45 
Error: 03:38:45 03:38:45.662 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal
 -- Time elapsed: 0.007 s <<< ERROR!
Jan 09 03:38:45 org.apache.flink.table.api.ValidationException: Table with 
identifier 'test-catalog.default.t1' does not exist.
Jan 09 03:38:45 at 
org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:1266)
Jan 09 03:38:45 at 
org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:1206)
Jan 09 03:38:45 at 
org.apache.flink.table.operations.ddl.DropTableOperation.execute(DropTableOperation.java:74)
Jan 09 03:38:45 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
Jan 09 03:38:45 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
Jan 09 03:38:45 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testCastTimeStampToDecimal(HiveDialectQueryITCase.java:835)
Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498)
Jan 09 03:38:45 
Error: 03:38:45 03:38:45.663 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory -- 
Time elapsed: 7.326 s <<< FAILURE!
Jan 09 03:38:45 org.opentest4j.AssertionFailedError: 
Jan 09 03:38:45 
Jan 09 03:38:45 expected: "A:english=90#math=100#history=85"
Jan 09 03:38:45  but was: "A:english=90math=100history=85"
Jan 09 03:38:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Jan 09 03:38:45 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Jan 09 03:38:45 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
Jan 09 03:38:45 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testInsertDirectory(HiveDialectQueryITCase.java:498)
Jan 09 03:38:45 at java.lang.reflect.Method.invoke(Method.java:498)
{code}

The most-recent build failures in GHA workflow failures are:
* 
https://github.com/XComp/flink/actions/runs/7455836411/job/20285758541#step:12:23332
* 
https://github.com/XComp/flink/actions/runs/7447254277/job/20259593089#step:12:23378
* 
https://github.com/XComp/flink/actions/runs/7442459819/job/20246101021#step:12:23332
* 
https://github.com/XComp/flink/actions/runs/7438111934/job/20236674470#step:12:23375
* 
https://github.com/XComp/flink/actions/runs/7435499743/job/20231030744#step:12:23367
Interestingly, the failure doesn't appear in the Azure Pipelines nightlies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33414) MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot fails due to unexpected TimeoutException

2024-01-08 Thread Jiang Xin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804610#comment-17804610
 ] 

Jiang Xin edited comment on FLINK-33414 at 1/9/24 7:44 AM:
---

[~Sergey Nuyanzin] Can it be consistently reproduced on AZP? I ran this test 
locally for hundreds of times, but no hang happened.


was (Author: jiang xin):
[~Sergey Nuyanzin] Is it can be consistently reproduced on AZP? I ran this test 
locally for hundreds of times, but no hang happened.

> MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot fails due to 
> unexpected TimeoutException
> ---
>
> Key: FLINK-33414
> URL: https://issues.apache.org/jira/browse/FLINK-33414
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Jiang Xin
>Priority: Critical
>  Labels: github-actions, pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> We see this test instability in [this 
> build|https://github.com/XComp/flink/actions/runs/6695266358/job/18192039035#step:12:9253].
> {code:java}
> Error: 17:04:52 17:04:52.042 [ERROR] Failures: 
> 9252Error: 17:04:52 17:04:52.042 [ERROR]   
> MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot:120 
> 9253Oct 30 17:04:52 Expecting a throwable with root cause being an instance 
> of:
> 9254Oct 30 17:04:52   
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException
> 9255Oct 30 17:04:52 but was an instance of:
> 9256Oct 30 17:04:52   java.util.concurrent.TimeoutException: Timeout has 
> occurred: 100 ms
> 9257Oct 30 17:04:52   at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
> 9258Oct 30 17:04:52   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 9259Oct 30 17:04:52   at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 9260Oct 30 17:04:52   ...(27 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed) {code}
> The same error occurred in the [finegrained_resourcemanager stage of this 
> build|https://github.com/XComp/flink/actions/runs/6468655160/job/17563927249#step:11:26516]
>  (as reported in FLINK-33245).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34034) When kv hint and list hint handle duplicate query hints, the results are different.

2024-01-08 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804611#comment-17804611
 ] 

xuyang commented on FLINK-34034:


Hi, [~qingyue]

Our documentation currently seems not to expose whether table hints and join 
hints are kv hints or list hints. I believe that further exposing to users 
whether a certain type of query hint is a list or kv hint is not beneficial, as 
it would then need to tell users about their different behaviors: in case of a 
conflict, a list hint will choose the first one, whereas a kv hint, processed 
by Calcite, will select the last one when their keys are the same one. I prefer 
to let the framework ensure the consistency of these behaviors. WDYT?

> When kv hint and list hint handle duplicate query hints, the results are 
> different.
> ---
>
> Key: FLINK-34034
> URL: https://issues.apache.org/jira/browse/FLINK-34034
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Minor
>
> When there are duplicate keys in the kv hint, calcite will overwrite the 
> previous value with the later value.
> {code:java}
> @TestTemplate
> def test(): Unit = {
>   val sql =
> "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 
> 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3', 
> 'max-attempts'='4') */ * FROM MyTable AS T JOIN LookupTable " +
>   "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
>   util.verifyExecPlan(sql)
> } {code}
> {code:java}
> Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 
> id, name, age]) 
>   +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
> joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
> name, age], retry=[lookup_miss, FIXED_DELAY, 1ms, 4]) 
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> But when a list hint is duplicated (such as a join hint), we will choose the 
> first one as the effective hint.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33414) MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot fails due to unexpected TimeoutException

2024-01-08 Thread Jiang Xin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804610#comment-17804610
 ] 

Jiang Xin commented on FLINK-33414:
---

[~Sergey Nuyanzin] Is it can be consistently reproduced on AZP? I ran this test 
locally for hundreds of times, but no hang happened.

> MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot fails due to 
> unexpected TimeoutException
> ---
>
> Key: FLINK-33414
> URL: https://issues.apache.org/jira/browse/FLINK-33414
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Jiang Xin
>Priority: Critical
>  Labels: github-actions, pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> We see this test instability in [this 
> build|https://github.com/XComp/flink/actions/runs/6695266358/job/18192039035#step:12:9253].
> {code:java}
> Error: 17:04:52 17:04:52.042 [ERROR] Failures: 
> 9252Error: 17:04:52 17:04:52.042 [ERROR]   
> MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot:120 
> 9253Oct 30 17:04:52 Expecting a throwable with root cause being an instance 
> of:
> 9254Oct 30 17:04:52   
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException
> 9255Oct 30 17:04:52 but was an instance of:
> 9256Oct 30 17:04:52   java.util.concurrent.TimeoutException: Timeout has 
> occurred: 100 ms
> 9257Oct 30 17:04:52   at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
> 9258Oct 30 17:04:52   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 9259Oct 30 17:04:52   at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 9260Oct 30 17:04:52   ...(27 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed) {code}
> The same error occurred in the [finegrained_resourcemanager stage of this 
> build|https://github.com/XComp/flink/actions/runs/6468655160/job/17563927249#step:11:26516]
>  (as reported in FLINK-33245).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]

2024-01-08 Thread via GitHub


reswqa commented on code in PR #23957:
URL: https://github.com/apache/flink/pull/23957#discussion_r1445651286


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##
@@ -244,6 +251,58 @@ public void release() {
 ExceptionUtils.rethrow(e);
 }
 }
+while (!bufferQueue.isEmpty()) {
+MemorySegment segment = bufferQueue.poll();
+bufferPool.recycle(segment);
+numRequestedBuffers.decrementAndGet();
+}
+}
+
+/**
+ * Returns a memory segment from the buffer pool or null if the memory 
manager has requested all
+ * segments of the buffer pool.
+ */
+private MemorySegment requestBufferBlockingFromPool() {

Review Comment:
   This should be marked as `Nullable` or return an `Optional`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##
@@ -173,49 +187,42 @@ public BufferBuilder requestBufferBlocking(Object owner) {
 
 reclaimBuffersIfNeeded(0);
 
-CompletableFuture requestBufferFuture = new 
CompletableFuture<>();
-scheduleCheckRequestBufferFuture(
-requestBufferFuture, 
INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS);
-MemorySegment memorySegment = bufferPool.requestMemorySegment();
-
+MemorySegment memorySegment = bufferQueue.poll();
 if (memorySegment == null) {
-try {
-hardBackpressureTimerGauge.markStart();
-memorySegment = bufferPool.requestMemorySegmentBlocking();
-hardBackpressureTimerGauge.markEnd();
-} catch (InterruptedException e) {
-ExceptionUtils.rethrow(e);
-}
+memorySegment = requestBufferBlockingFromPool();
+}
+if (memorySegment == null) {
+memorySegment = requestBufferBlockingFromQueue();

Review Comment:
   ```suggestion
   memorySegment = checkNotNull(requestBufferBlockingFromQueue());
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##
@@ -244,6 +251,58 @@ public void release() {
 ExceptionUtils.rethrow(e);
 }
 }
+while (!bufferQueue.isEmpty()) {
+MemorySegment segment = bufferQueue.poll();
+bufferPool.recycle(segment);
+numRequestedBuffers.decrementAndGet();
+}
+}
+
+/**
+ * Returns a memory segment from the buffer pool or null if the memory 
manager has requested all
+ * segments of the buffer pool.
+ */

Review Comment:
   This appears to be describing the `@return` of this method rather than java 
doc.



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java:
##
@@ -192,15 +188,34 @@ void testMetricsUpdateForBroadcastOnlyResultPartition() 
throws Exception {
 try (TieredResultPartition partition =
 createTieredStoreResultPartition(2, bufferPool, true)) {
 partition.broadcastRecord(ByteBuffer.allocate(bufferSize));
-IOMetrics ioMetrics = taskIOMetricGroup.createSnapshot();
-assertThat(ioMetrics.getResultPartitionBytes()).hasSize(1);
-ResultPartitionBytes partitionBytes =
-
ioMetrics.getResultPartitionBytes().values().iterator().next();
-assertThat(partitionBytes.getSubpartitionBytes())
-.containsExactly(bufferSize, bufferSize);
+verifySubpartitionBytes(bufferSize, bufferSize);
 }
 }
 
+@Test
+void testRequestBuffersAfterPoolSizeDecreased() throws IOException {
+final int numBuffers = 15;
+final int numRecords = 100;
+BufferPool bufferPool = globalPool.createBufferPool(1, numBuffers);
+TieredResultPartition resultPartition =
+createTieredStoreResultPartitionWithStorageManager(1, 
bufferPool, false);
+
+// Emits some records to fill up all buffers of memory tier, these 
buffers would not be
+// recycled until the subpartitionView is released manually.
+for (int i = 0; i < numRecords; i++) {
+
resultPartition.emitRecord(ByteBuffer.allocate(NETWORK_BUFFER_SIZE), 0);
+}
+verifySubpartitionBytes(numRecords * NETWORK_BUFFER_SIZE);
+
+bufferPool.setNumBuffers(1);

Review Comment:
   Better to check that there are no more buffers in the buffer pool before 
this.



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.java:
##
@@ -242,19 +173,49 @@ void testCanNotTransferOwnershipForEvent() throws 
IOException {
  

[jira] [Commented] (FLINK-33233) Null point exception when non-native udf used in join condition

2024-01-08 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804607#comment-17804607
 ] 

Matthias Pohl commented on FLINK-33233:
---

Out of curiosity: Is this really an improvement? It sounds more like a bug to 
me. If that's the case, it might be worth backporting it to 1.18 and 1.17 as 
well (since the affected version is stated as 1.17.0 in this Jira issue). 
[~luoyuxia] [~yunfanfight...@foxmail.com] WDYT?

> Null point exception when non-native udf used in join condition
> ---
>
> Key: FLINK-33233
> URL: https://issues.apache.org/jira/browse/FLINK-33233
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: yunfan
>Assignee: yunfan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Any non-native udf used in hive-parser join condition. 
> It will caused NullPointException.
> It can reproduced by follow code by adding this test to 
> {code:java}
> org.apache.flink.connectors.hive.HiveDialectQueryITCase{code}
>  
> {code:java}
> // Add follow code to org.apache.flink.connectors.hive.HiveDialectQueryITCase
> @Test
> public void testUdfInJoinCondition() throws Exception {
> List result = CollectionUtil.iteratorToList(tableEnv.executeSql(
> "select foo.y, bar.I from bar join foo on hiveudf(foo.x) = bar.I 
> where bar.I > 1").collect());
> assertThat(result.toString())
> .isEqualTo("[+I[2, 2]]");
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34035) when flinksql with group by partition some errors field occured in jobmanager.log

2024-01-08 Thread hansonhe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hansonhe updated FLINK-34035:
-
Description: 
flink.version=1.17.1
kyuubi.version=1.8.0
hive.version=3.1.2
when run some hive sql as followings:
CREATE CATALOG bidwhive WITH ('type' = 'hive', 'hive-version' = '3.1.2', 
'default-database' = 'test');
(1)select count({_}) from bidwhive.test.dws_test where dt='2024-01-02' ;{_}
_+-+_
_| EXPR$0 |_
_+-+_
_| 1317 |_
_+-+_
_It's OK. There is no errors anywhere._
{_}(2)select dt,count({_}) from bidwhive.test.dws_test where dt='2024-01-02' 
group by dt;

{+}--{+}--+
|dt|EXPR$1|

{+}--{+}--+
|2024-01-02|1317|

{+}--{+}--+

It can get correct result. But when i check jobmanager.log,I found some errors 
appeared repeatly as folowings.Sometimes the errors also appeared on the client 
terminal. I don't known whether these error will affect task runtime or not?. 
Can somebody help me to have a see?

'''
2024-01-09 14:03:25,979 WARN 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
exception occurred when fetching query results
java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkException: 
Coordinator of operator e9a3cbdf90f308bdf13b34acd6410e2b does not exist or the 
job vertex this operator belongs to is not initialized. at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_191]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
~[?:1.8.0_191]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:170)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:129)
 [flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 [flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 [flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 [flink-table-planner_b1e58bff-c004-4dba-b7d4-fff4e8145073.jar:1.17.1]
at 
org.apache.flink.table.gateway.service.result.ResultStore$ResultRetrievalThread.run(ResultStore.java:155)
 [flink-sql-gateway-1.17.1.jar:1.17.1]Caused by: 
org.apache.flink.util.FlinkException: Coordinator of operator 
e9a3cbdf90f308bdf13b34acd6410e2b does not exist or the job vertex this operator 
belongs to is not initialized. at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(DefaultOperatorCoordinatorHandler.java:135)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.deliverCoordinationRequestToCoordinator(SchedulerBase.java:1048)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.jobmaster.JobMaster.sendRequestToCoordinator(JobMaster.java:602)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.jobmaster.JobMaster.deliverCoordinationRequestToCoordinator(JobMaster.java:918)
 ~[flink-dist-1.17.1.jar:1.17.1]
at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) ~[?:?]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
 ~[?:?]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
 ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 

[jira] [Updated] (FLINK-34035) when flinksql with group by partition some errors field occured in jobmanager.log

2024-01-08 Thread hansonhe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hansonhe updated FLINK-34035:
-
Description: 
flink.version=1.17.1
kyuubi.version=1.8.0
hive.version=3.1.2
when run some hive sql as followings:
CREATE CATALOG bidwhive WITH ('type' = 'hive', 'hive-version' = '3.1.2', 
'default-database' = 'test');
(1)select count({_}) from bidwhive.test.dws_test where dt='2024-01-02' ;{_}
_+-+_
_| EXPR$0 |_
_+-+_
_| 1317 |_
_+-+_
_It's OK. There is no errors anywhere._
{_}(2)select dt,count({_}) from bidwhive.test.dws_test where dt='2024-01-02' 
group by dt;

{+}---{-}{{-}}{-}{-}{+}---+
|dt|EXPR$1|

{+}---{-}{{-}}{-}{-}{+}---+
|2024-01-02|1317|

{+}---{-}{{-}}{-}{-}{+}---+

It can get correct result. But when i check jobmanager.log,I found some errors 
appeared repeatly as folowings.Sometimes the errors also appeared on the client 
terminal. I don't known whether these error will affect task runtime or not?. 
Can somebody help me to have a see?

'''
2024-01-09 14:03:25,979 WARN 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
exception occurred when fetching query results
java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkException: 
Coordinator of operator e9a3cbdf90f308bdf13b34acd6410e2b does not exist or the 
job vertex this operator belongs to is not initialized. at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_191]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
~[?:1.8.0_191]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:170)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:129)
 [flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 [flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 [flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 [flink-table-planner_b1e58bff-c004-4dba-b7d4-fff4e8145073.jar:1.17.1]
at 
org.apache.flink.table.gateway.service.result.ResultStore$ResultRetrievalThread.run(ResultStore.java:155)
 [flink-sql-gateway-1.17.1.jar:1.17.1]Caused by: 
org.apache.flink.util.FlinkException: Coordinator of operator 
e9a3cbdf90f308bdf13b34acd6410e2b does not exist or the job vertex this operator 
belongs to is not initialized. at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(DefaultOperatorCoordinatorHandler.java:135)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.deliverCoordinationRequestToCoordinator(SchedulerBase.java:1048)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.jobmaster.JobMaster.sendRequestToCoordinator(JobMaster.java:602)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.jobmaster.JobMaster.deliverCoordinationRequestToCoordinator(JobMaster.java:918)
 ~[flink-dist-1.17.1.jar:1.17.1]
at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) ~[?:?]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
 ~[?:?]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
 ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at 

[jira] [Updated] (FLINK-34035) when flinksql with group by partition some errors field occured in jobmanager.log

2024-01-08 Thread hansonhe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hansonhe updated FLINK-34035:
-
Description: 
flink.version=1.17.1
kyuubi.version=1.8.0
hive.version=3.1.2
when run some hive sql as followings:
CREATE CATALOG bidwhive WITH ('type' = 'hive', 'hive-version' = '3.1.2', 
'default-database' = 'test');
(1)select count({_}) from bidwhive.test.dws_test where dt='2024-01-02' ;{_}
_+-+_
_| EXPR$0 |_
_+-+_
_| 1317 |_
_+-+_
_It's OK. There is no errors anywhere._
{_}(2)select dt,count({_}) from bidwhive.test.dws_test where dt='2024-01-02' 
group by dt;

{+}{-}{-}{+}+
|dt|EXPR$1|

{+}{-}{-}{+}+
|2024-01-02|1317|

{+}{-}{-}{+}+

It can get correct result. But when i check jobmanager.log,I found some errors 
appeared repeatly as folowings.Sometimes the errors also appeared on the client 
terminal. I don't known whether these error will affect task runtime or not?. 
Can somebody help me to have a see?

'''
2024-01-09 14:03:25,979 WARN 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
exception occurred when fetching query results
java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkException: 
Coordinator of operator e9a3cbdf90f308bdf13b34acd6410e2b does not exist or the 
job vertex this operator belongs to is not initialized. at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_191]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
~[?:1.8.0_191]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:170)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:129)
 [flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 [flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 [flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 [flink-table-planner_b1e58bff-c004-4dba-b7d4-fff4e8145073.jar:1.17.1]
at 
org.apache.flink.table.gateway.service.result.ResultStore$ResultRetrievalThread.run(ResultStore.java:155)
 [flink-sql-gateway-1.17.1.jar:1.17.1]Caused by: 
org.apache.flink.util.FlinkException: Coordinator of operator 
e9a3cbdf90f308bdf13b34acd6410e2b does not exist or the job vertex this operator 
belongs to is not initialized. at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(DefaultOperatorCoordinatorHandler.java:135)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.deliverCoordinationRequestToCoordinator(SchedulerBase.java:1048)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.jobmaster.JobMaster.sendRequestToCoordinator(JobMaster.java:602)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.jobmaster.JobMaster.deliverCoordinationRequestToCoordinator(JobMaster.java:918)
 ~[flink-dist-1.17.1.jar:1.17.1]
at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) ~[?:?]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
 ~[?:?]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
 ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at 

[jira] [Updated] (FLINK-34035) when flinksql with group by partition some errors field occured in jobmanager.log

2024-01-08 Thread hansonhe (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hansonhe updated FLINK-34035:
-
Description: 
flink.version=1.17.1
kyuubi.version=1.8.0
hive.version=3.1.2
when run some hive sql as followings:
CREATE CATALOG bidwhive WITH ('type' = 'hive', 'hive-version' = '3.1.2', 
'default-database' = 'test');
(1)select count({_}) from bidwhive.test.dws_test where dt='2024-01-02' ;{_}
_+-+_
_| EXPR$0 |_
_+-+_
_| 1317 |_
_+-+_
_It's OK. There is no errors anywhere._
{_}(2)select dt,count({_}) from bidwhive.test.dws_test where dt='2024-01-02' 
group by dt;

+-+-+
| dt | EXPR$1 |
+-+-+
| 2024-01-02 | 1317 |
+-+-+

It can get correct result. But when i check jobmanager.log,I found some errors 
appeared repeatly as folowings.Sometimes the errors also appeared on the client 
terminal. I don't known whether these error will affect task runtime or not?. 
Can somebody help me to have a see?


2024-01-09 14:03:25,979 WARN  
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
exception occurred when fetching query results
java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkException: 
Coordinator of operator e9a3cbdf90f308bdf13b34acd6410e2b does not exist or the 
job vertex this operator belongs to is not initialized.   at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
~[?:1.8.0_191]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:170)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:129)
 [flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 [flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 [flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
 [flink-table-planner_b1e58bff-c004-4dba-b7d4-fff4e8145073.jar:1.17.1]
at 
org.apache.flink.table.gateway.service.result.ResultStore$ResultRetrievalThread.run(ResultStore.java:155)
 [flink-sql-gateway-1.17.1.jar:1.17.1]Caused by: 
org.apache.flink.util.FlinkException: Coordinator of operator 
e9a3cbdf90f308bdf13b34acd6410e2b does not exist or the job vertex this operator 
belongs to is not initialized.   at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(DefaultOperatorCoordinatorHandler.java:135)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.deliverCoordinationRequestToCoordinator(SchedulerBase.java:1048)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.jobmaster.JobMaster.sendRequestToCoordinator(JobMaster.java:602)
 ~[flink-dist-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.jobmaster.JobMaster.deliverCoordinationRequestToCoordinator(JobMaster.java:918)
 ~[flink-dist-1.17.1.jar:1.17.1]
at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) ~[?:?]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
 ~[?:?]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
 ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
 ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
~[flink-scala_2.12-1.17.1.jar:1.17.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 

Re: [PR] [FLINK-33738][Scheduler] Make exponential-delay restart-strategy the default restart strategy [flink]

2024-01-08 Thread via GitHub


qinf commented on PR #24040:
URL: https://github.com/apache/flink/pull/24040#issuecomment-1882535964

   Thank you @1996fanrui for your contribution, LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34035) when flinksql with group by partition some errors field occured in jobmanager.log

2024-01-08 Thread hansonhe (Jira)
hansonhe created FLINK-34035:


 Summary: when flinksql with group by partition some errors field 
occured in jobmanager.log
 Key: FLINK-34035
 URL: https://issues.apache.org/jira/browse/FLINK-34035
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.17.1
Reporter: hansonhe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds

2024-01-08 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804604#comment-17804604
 ] 

Sergey Nuyanzin commented on FLINK-27756:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56124=logs=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819=2dd510a3-5041-5201-6dc3-54d310f68906=10739

> Fix intermittently failing test in 
> AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
> --
>
> Key: FLINK-27756
> URL: https://issues.apache.org/jira/browse/FLINK-27756
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.15.0, 1.17.0, 1.19.0
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> h2. Motivation
>  - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of 
> {{AsyncSinkWriterTest}} has been reported to fail intermittently on build 
> pipeline causing blocking of new changes.
>  - Reporting build is [linked 
> |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-33414) MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot fails due to unexpected TimeoutException

2024-01-08 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reopened FLINK-33414:
-

I fear I have to reopen this issue since now 
testHandleStreamingJobsWhenNotEnoughSlot started to hang forever on AZP
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56124=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=9813

{noformat}
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
Jan 09 02:01:16 at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
Jan 09 02:01:16 at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3313)
Jan 09 02:01:16 at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
Jan 09 02:01:16 at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
Jan 09 02:01:16 at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:1050)
Jan 09 02:01:16 at 
org.apache.flink.runtime.minicluster.MiniClusterITCase.runHandleJobsWhenNotEnoughSlots(MiniClusterITCase.java:152)
Jan 09 02:01:16 at 
org.apache.flink.runtime.minicluster.MiniClusterITCase.lambda$testHandleStreamingJobsWhenNotEnoughSlot$0(MiniClusterITCase.java:119)
Jan 09 02:01:16 at 
org.apache.flink.runtime.minicluster.MiniClusterITCase$$Lambda$1927/1144737794.call(Unknown
 Source)
Jan 09 02:01:16 at 
org.assertj.core.api.ThrowableAssert.catchThrowable(ThrowableAssert.java:63)
Jan 09 02:01:16 at 
org.assertj.core.api.AssertionsForClassTypes.catchThrowable(AssertionsForClassTypes.java:892)
Jan 09 02:01:16 at 
org.assertj.core.api.Assertions.catchThrowable(Assertions.java:1366)
Jan 09 02:01:16 at 
org.assertj.core.api.Assertions.assertThatThrownBy(Assertions.java:1210)
Jan 09 02:01:16 at 
org.apache.flink.runtime.minicluster.MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot(MiniClusterITCase.java:119)
Jan 09 02:01:16 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Jan 09 02:01:16 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Jan 09 02:01:16 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

{noformat}
[~Jiang Xin] coudl you please have a look?

> MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot fails due to 
> unexpected TimeoutException
> ---
>
> Key: FLINK-33414
> URL: https://issues.apache.org/jira/browse/FLINK-33414
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Jiang Xin
>Priority: Critical
>  Labels: github-actions, pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> We see this test instability in [this 
> build|https://github.com/XComp/flink/actions/runs/6695266358/job/18192039035#step:12:9253].
> {code:java}
> Error: 17:04:52 17:04:52.042 [ERROR] Failures: 
> 9252Error: 17:04:52 17:04:52.042 [ERROR]   
> MiniClusterITCase.testHandleStreamingJobsWhenNotEnoughSlot:120 
> 9253Oct 30 17:04:52 Expecting a throwable with root cause being an instance 
> of:
> 9254Oct 30 17:04:52   
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException
> 9255Oct 30 17:04:52 but was an instance of:
> 9256Oct 30 17:04:52   java.util.concurrent.TimeoutException: Timeout has 
> occurred: 100 ms
> 9257Oct 30 17:04:52   at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
> 9258Oct 30 17:04:52   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 9259Oct 30 17:04:52   at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 9260Oct 30 17:04:52   ...(27 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed) {code}
> The same error occurred in the [finegrained_resourcemanager stage of this 
> build|https://github.com/XComp/flink/actions/runs/6468655160/job/17563927249#step:11:26516]
>  (as reported in FLINK-33245).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32570) Deprecate API that uses Flink's Time implementation (related to FLINK-14638)

2024-01-08 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl closed FLINK-32570.
-
Release Note: Flink's Time classes are deprecated now and will be subject 
to deletion with the release of Flink 2.0. Please start to use Java's own 
Duration class, instead. Methods supporting the Duration class that replace the 
deprecated Time-based methods were introduced.
  Resolution: Fixed

> Deprecate API that uses Flink's Time implementation (related to FLINK-14638)
> 
>
> Key: FLINK-32570
> URL: https://issues.apache.org/jira/browse/FLINK-32570
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> The plan is to resolve FLINK-14038 with Flink 2.0. As a preparation, we have 
> to deprecate related @Public API .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34034) When kv hint and list hint handle duplicate query hints, the results are different.

2024-01-08 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804601#comment-17804601
 ] 

Jane Chan commented on FLINK-34034:
---

Hi [~xuyangzhong], thanks for reporting this issue.

According to the implementation, the description of the [hint 
doc|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/hints/#conflict-cases-in-join-hints]
 does not match the actual behavior.
{quote}If the Join Hints conflicts occur, Flink will choose the most matching 
one. Conflict in one same Join Hint strategy, Flink will choose the first 
matching table for a join. Conflict in different Join Hints strategies, Flink 
will choose the first matching hint for a join.
{quote}

However, I think having different behavior for the list option and KV options 
makes sense. For example, Java Map does not allow duplicate KVs, while the List 
is okay to have duplicated items.

> When kv hint and list hint handle duplicate query hints, the results are 
> different.
> ---
>
> Key: FLINK-34034
> URL: https://issues.apache.org/jira/browse/FLINK-34034
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Minor
>
> When there are duplicate keys in the kv hint, calcite will overwrite the 
> previous value with the later value.
> {code:java}
> @TestTemplate
> def test(): Unit = {
>   val sql =
> "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 
> 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3', 
> 'max-attempts'='4') */ * FROM MyTable AS T JOIN LookupTable " +
>   "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
>   util.verifyExecPlan(sql)
> } {code}
> {code:java}
> Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 
> id, name, age]) 
>   +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
> joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
> name, age], retry=[lookup_miss, FIXED_DELAY, 1ms, 4]) 
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> But when a list hint is duplicated (such as a join hint), we will choose the 
> first one as the effective hint.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-32570) Deprecate API that uses Flink's Time implementation (related to FLINK-14638)

2024-01-08 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl reopened FLINK-32570:
---

> Deprecate API that uses Flink's Time implementation (related to FLINK-14638)
> 
>
> Key: FLINK-32570
> URL: https://issues.apache.org/jira/browse/FLINK-32570
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> The plan is to resolve FLINK-14038 with Flink 2.0. As a preparation, we have 
> to deprecate related @Public API .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33325) FLIP-375: Built-in cross-platform powerful java profiler

2024-01-08 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804598#comment-17804598
 ] 

Zhanghao Chen commented on FLINK-33325:
---

Looking forward to the feature~

> FLIP-375: Built-in cross-platform powerful java profiler
> 
>
> Key: FLINK-33325
> URL: https://issues.apache.org/jira/browse/FLINK-33325
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Affects Versions: 1.19.0
>Reporter: Yu Chen
>Assignee: Yu Chen
>Priority: Major
>
> This is an umbrella JIRA of 
> [FLIP-375|https://cwiki.apache.org/confluence/x/64lEE]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33938) Correct implicit coercions in relational operators to adopt typescript 5.0

2024-01-08 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-33938.

Resolution: Fixed

- master (1.19): e07545e458bd22099244a353ac29477ca3a13811
- release-1.18: 12463fbad39edc17af687c1421bba4623f924083
- release-1.17: ce62d45477447537088930ec116f7e18a2743166

> Correct implicit coercions in relational operators to adopt typescript 5.0
> --
>
> Key: FLINK-33938
> URL: https://issues.apache.org/jira/browse/FLINK-33938
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.19.0
>Reporter: Ao Yuchen
>Assignee: Ao Yuchen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.17.3, 1.18.2
>
>
> Since TypeScript 5.0, there is a break change that implicit coercions in 
> relational operators are forbidden [1].
> So that the following code in 
> flink-runtime-web/web-dashboard/src/app/components/humanize-date.pipe.ts get 
> error:
> {code:java}
> public transform(
>   value: number | string | Date,
>   ...
> ): string | null | undefined {
>   if (value == null || value === '' || value !== value || value < 0) {
> return '-';
>   } 
>   ...
> }{code}
> The correctness improvement is availble in here 
> [2][.|https://github.com/microsoft/TypeScript/pull/52048.]
> I think we should optimize this type of code for better compatibility.
>  
> [1] 
> [https://devblogs.microsoft.com/typescript/announcing-typescript-5-0/#forbidden-implicit-coercions-in-relational-operators]
> [2] 
> [https://github.com/microsoft/TypeScript/pull/52048|https://github.com/microsoft/TypeScript/pull/52048.]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33681) Display source/sink numRecordsIn/Out & numBytesIn/Out on UI

2024-01-08 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804596#comment-17804596
 ] 

Zhanghao Chen commented on FLINK-33681:
---

[~renqs] Kindly remind~ Would you mind take a look at the issue? I think the 
feature is useful for debugging, and would like to hear from someone who is a 
master in this field. Many thanks in advance

> Display source/sink numRecordsIn/Out & numBytesIn/Out on UI
> ---
>
> Key: FLINK-33681
> URL: https://issues.apache.org/jira/browse/FLINK-33681
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.18.0, 1.17.2
>Reporter: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Attachments: screenshot-20231224-195605-1.png, 
> screenshot-20231225-120421.png
>
>
> Currently, the numRecordsIn & numBytesIn metrics for sources and the 
> numRecordsOut & numBytesOut metrics for sinks are always 0 on the Flink web 
> dashboard.
> FLINK-11576 brings us these metrics on the opeartor level, but it does not 
> integrate them on the task level. On the other hand, the summay metrics on 
> the job overview page is based on the task level I/O metrics. As a result, 
> even though new connectors supporting FLIP-33 metrics will report 
> operator-level I/O metrics, we still cannot see the metrics on dashboard.
> This ticket serves as an umbrella issue to integrate standard source/sink I/O 
> metrics with the corresponding task I/O metrics. 
> !screenshot-20231224-195605-1.png|width=608,height=333!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33938][runtime-web] Correct implicit coercions in relational operators to adopt typescript 5.0 [flink]

2024-01-08 Thread via GitHub


xintongsong closed pull request #23999: [FLINK-33938][runtime-web] Correct 
implicit coercions in relational operators to adopt typescript 5.0
URL: https://github.com/apache/flink/pull/23999


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34034) When kv hint and list hint handle duplicate query hints, the results are different.

2024-01-08 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34034:
---
Component/s: Table SQL / Planner

> When kv hint and list hint handle duplicate query hints, the results are 
> different.
> ---
>
> Key: FLINK-34034
> URL: https://issues.apache.org/jira/browse/FLINK-34034
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Minor
>
> When there are duplicate keys in the kv hint, calcite will overwrite the 
> previous value with the later value.
> {code:java}
> @TestTemplate
> def test(): Unit = {
>   val sql =
> "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 
> 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3', 
> 'max-attempts'='4') */ * FROM MyTable AS T JOIN LookupTable " +
>   "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
>   util.verifyExecPlan(sql)
> } {code}
> {code:java}
> Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 
> id, name, age]) 
>   +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
> joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
> name, age], retry=[lookup_miss, FIXED_DELAY, 1ms, 4]) 
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> But when a list hint is duplicated (such as a join hint), we will choose the 
> first one as the effective hint.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34034) When kv hint and list hint handle duplicate query hints, the results are different.

2024-01-08 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan reassigned FLINK-34034:
-

Assignee: xuyang

> When kv hint and list hint handle duplicate query hints, the results are 
> different.
> ---
>
> Key: FLINK-34034
> URL: https://issues.apache.org/jira/browse/FLINK-34034
> Project: Flink
>  Issue Type: Bug
>Reporter: xuyang
>Assignee: xuyang
>Priority: Minor
>
> When there are duplicate keys in the kv hint, calcite will overwrite the 
> previous value with the later value.
> {code:java}
> @TestTemplate
> def test(): Unit = {
>   val sql =
> "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 
> 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3', 
> 'max-attempts'='4') */ * FROM MyTable AS T JOIN LookupTable " +
>   "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
>   util.verifyExecPlan(sql)
> } {code}
> {code:java}
> Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 
> id, name, age]) 
>   +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
> joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
> name, age], retry=[lookup_miss, FIXED_DELAY, 1ms, 4]) 
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> But when a list hint is duplicated (such as a join hint), we will choose the 
> first one as the effective hint.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31472) AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread

2024-01-08 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804591#comment-17804591
 ] 

Sergey Nuyanzin commented on FLINK-31472:
-

Converting to critical since it started appearing very often

> AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread
> 
>
> Key: FLINK-31472
> URL: https://issues.apache.org/jira/browse/FLINK-31472
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.16.1, 1.19.0
>Reporter: Ran Tao
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> when run mvn clean test, this case failed occasionally.
> {noformat}
> [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.955 
> s <<< FAILURE! - in 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest
> [ERROR] 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize
>   Time elapsed: 0.492 s  <<< ERROR!
> java.lang.IllegalStateException: Illegal thread detected. This method must be 
> called from inside the mailbox thread!
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:262)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:137)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:84)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:367)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$registerCallback$3(AsyncSinkWriter.java:315)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$CallbackTask.onProcessingTime(TestProcessingTimeService.java:199)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.setCurrentTime(TestProcessingTimeService.java:76)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize(AsyncSinkWriterThrottlingTest.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>         at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>         at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>         at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>         at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>         at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>         at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>         at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>         at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
> 

[jira] [Commented] (FLINK-31472) AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread

2024-01-08 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804590#comment-17804590
 ] 

Sergey Nuyanzin commented on FLINK-31472:
-

Thanks for looking into thi [~chalixar]l

> AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread
> 
>
> Key: FLINK-31472
> URL: https://issues.apache.org/jira/browse/FLINK-31472
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.16.1, 1.19.0
>Reporter: Ran Tao
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> when run mvn clean test, this case failed occasionally.
> {noformat}
> [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.955 
> s <<< FAILURE! - in 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest
> [ERROR] 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize
>   Time elapsed: 0.492 s  <<< ERROR!
> java.lang.IllegalStateException: Illegal thread detected. This method must be 
> called from inside the mailbox thread!
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:262)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:137)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:84)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:367)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$registerCallback$3(AsyncSinkWriter.java:315)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$CallbackTask.onProcessingTime(TestProcessingTimeService.java:199)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.setCurrentTime(TestProcessingTimeService.java:76)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize(AsyncSinkWriterThrottlingTest.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>         at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>         at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>         at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>         at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>         at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>         at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>         at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>         at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
>         at 
> 

[jira] [Commented] (FLINK-31472) AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread

2024-01-08 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804589#comment-17804589
 ] 

Sergey Nuyanzin commented on FLINK-31472:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56124=logs=1c002d28-a73d-5309-26ee-10036d8476b4=d1c117a6-8f13-5466-55f0-d48dbb767fcd=10413

> AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread
> 
>
> Key: FLINK-31472
> URL: https://issues.apache.org/jira/browse/FLINK-31472
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.16.1, 1.19.0
>Reporter: Ran Tao
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> when run mvn clean test, this case failed occasionally.
> {noformat}
> [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.955 
> s <<< FAILURE! - in 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest
> [ERROR] 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize
>   Time elapsed: 0.492 s  <<< ERROR!
> java.lang.IllegalStateException: Illegal thread detected. This method must be 
> called from inside the mailbox thread!
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:262)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:137)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:84)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:367)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$registerCallback$3(AsyncSinkWriter.java:315)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$CallbackTask.onProcessingTime(TestProcessingTimeService.java:199)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.setCurrentTime(TestProcessingTimeService.java:76)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize(AsyncSinkWriterThrottlingTest.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>         at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>         at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>         at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>         at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>         at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>         at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>         at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>         at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
>         at 
> 

[jira] [Updated] (FLINK-31472) AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread

2024-01-08 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-31472:

Priority: Critical  (was: Major)

> AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread
> 
>
> Key: FLINK-31472
> URL: https://issues.apache.org/jira/browse/FLINK-31472
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.16.1, 1.19.0
>Reporter: Ran Tao
>Assignee: Ahmed Hamdy
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> when run mvn clean test, this case failed occasionally.
> {noformat}
> [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.955 
> s <<< FAILURE! - in 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest
> [ERROR] 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize
>   Time elapsed: 0.492 s  <<< ERROR!
> java.lang.IllegalStateException: Illegal thread detected. This method must be 
> called from inside the mailbox thread!
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:262)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:137)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:84)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:367)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$registerCallback$3(AsyncSinkWriter.java:315)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService$CallbackTask.onProcessingTime(TestProcessingTimeService.java:199)
>         at 
> org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService.setCurrentTime(TestProcessingTimeService.java:76)
>         at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriterThrottlingTest.testSinkThroughputShouldThrottleToHalfBatchSize(AsyncSinkWriterThrottlingTest.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>         at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>         at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>         at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>         at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>         at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>         at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>         at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>         at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>         at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>         at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
>         at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
>         at 
> 

[jira] [Created] (FLINK-34034) When kv hint and list hint handle duplicate query hints, the results are different.

2024-01-08 Thread xuyang (Jira)
xuyang created FLINK-34034:
--

 Summary: When kv hint and list hint handle duplicate query hints, 
the results are different.
 Key: FLINK-34034
 URL: https://issues.apache.org/jira/browse/FLINK-34034
 Project: Flink
  Issue Type: Bug
Reporter: xuyang


When there are duplicate keys in the kv hint, calcite will overwrite the 
previous value with the later value.
{code:java}
@TestTemplate
def test(): Unit = {
  val sql =
"SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 
'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3', 
'max-attempts'='4') */ * FROM MyTable AS T JOIN LookupTable " +
  "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
  util.verifyExecPlan(sql)
} {code}
{code:java}
Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age]) 
  +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age], retry=[lookup_miss, FIXED_DELAY, 1ms, 4]) 
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
{code}
But when a list hint is duplicated (such as a join hint), we will choose the 
first one as the effective hint.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-08 Thread via GitHub


JunRuiLee commented on PR #24025:
URL: https://github.com/apache/flink/pull/24025#issuecomment-1882495740

   > Hi @JunRuiLee , would you mind rebaseing the master branch to fix the CI? 
thanks~
   
   Hi @1996fanrui , I am currently waiting for the merge of 
https://github.com/apache/flink/pull/24046. Once that has been completed, I'll 
go ahead and remove the hotfix commit 6ba47965e234b6524f10dbe0d933e5e5dff6627, 
and then I will rebase onto the updated master branch. Thanks for your patience!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]

2024-01-08 Thread via GitHub


LadyForest commented on PR #23752:
URL: https://github.com/apache/flink/pull/23752#issuecomment-1882495417

   1. Change state metadata time unit to adapt changes introduced by 
https://github.com/apache/flink/pull/23873
   2. Cope with duplicate hints/hint KV options


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: missing serialVersionUID of serializable classes [flink]

2024-01-08 Thread via GitHub


flinkbot commented on PR #24047:
URL: https://github.com/apache/flink/pull/24047#issuecomment-1882480603

   
   ## CI report:
   
   * 57136eb1fbcb1eb408d7abe73200a468cf026a10 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] fix: missing serialVersionUID of serializable classes [flink]

2024-01-08 Thread via GitHub


rng70-or opened a new pull request, #24047:
URL: https://github.com/apache/flink/pull/24047

   
   
   ## What is the purpose of the change
   
   In several files there are some classes that are serializable but they do 
not contain any serialVersionUID field. The compiler generates one by default 
in such scenarios, but the generated id is dependent on compiler implementation 
and may cause unwanted problems during deserialization.
   
   ## The Role of serialVersionUID:
   
   The primary role of serialVersionUID is to provide version control during 
deserialization. When deserialize an object, the JVM checks whether the 
serialVersionUID of the serialized data matches the serialVersionUID of the 
class in the current classpath. If they match, the deserialization proceeds 
without issues. However, if they do not match, programmers encounter 
InvalidClassException.
   
   As, serialVersionUID servers the purpose of version control of class during 
serialization-deserialization, without a serialVersionUID, we risk breaking 
backward compatibility when making changes to classes, which can lead to 
unexpected issues and errors during deserialization.
   
   ## Does this pull request potentially affect one of the following parts:
 - The serializers: (yes)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   
   ## Sponsorship and Support:
   
   This work is done by the security researchers from OpenRefactory and is 
supported by the [Open Source Security Foundation 
(OpenSSF)](https://openssf.org/): [Project 
Alpha-Omega](https://alpha-omega.dev/). Alpha-Omega is a project partnering 
with open source software project maintainers to systematically find new, 
as-yet-undiscovered vulnerabilities in open source code - and get them fixed – 
to improve global software supply chain security.
   
   The bug is found by running the Intelligent Code Repair (iCR) tool by 
OpenRefactory and then manually triaging the results.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]

2024-01-08 Thread via GitHub


yunfengzhou-hub commented on PR #23927:
URL: https://github.com/apache/flink/pull/23927#issuecomment-1882465840

   Hi @reswqa Could you please take a look at this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]

2024-01-08 Thread via GitHub


LadyForest commented on code in PR #23752:
URL: https://github.com/apache/flink/pull/23752#discussion_r1445599937


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java:
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.hints.stream;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.utils.PlanKind;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import scala.Enumeration;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link org.apache.flink.table.planner.hint.StateTtlHint}. */
+class StateTtlHintTest extends TableTestBase {
+
+protected StreamTableTestUtil util;
+
+@BeforeEach
+void before() {
+util = streamTestUtil(TableConfig.getDefault());
+util.tableEnv()
+.executeSql(
+"CREATE TABLE T1 (\n"
++ "  a1 BIGINT,\n"
++ "  b1 VARCHAR\n"
++ ") WITH (\n"
++ " 'connector' = 'values'\n"
++ ")");
+util.tableEnv()
+.executeSql(
+"CREATE TABLE T2 (\n"
++ "  a2 BIGINT,\n"
++ "  b2 VARCHAR\n"
++ ") WITH (\n"
++ " 'connector' = 'values'\n"
++ ")");
+
+util.tableEnv()
+.executeSql(
+"CREATE TABLE T3 (\n"
++ "  a3 BIGINT,\n"
++ "  b3 VARCHAR\n"
++ ") WITH (\n"
++ " 'connector' = 'values'\n"
++ ")");
+
+util.tableEnv().executeSql("CREATE View V4 as select a3 as a4, b3 as 
b4 from T3");
+
+util.tableEnv()
+.executeSql("create view V5 as select T1.* from T1 join T2 on 
T1.a1 = T2.a2");
+}
+
+@Test
+void testSimpleJoinStateTtlHintWithEachSide() {
+String sql =
+"select /*+ STATE_TTL('T2' = '2d', 'T1' = '1d') */* from T1 
join T2 on T1.a1 = T2.a2";
+verify(sql);
+}
+
+@Test
+void testJoinStateTtlHintOnlyWithRightSide() {
+String sql = "select /*+ STATE_TTL('T2' = '2d') */* from T1 join T2 on 
T1.a1 = T2.a2";
+verify(sql);
+}
+
+@Test
+void testJoinStateTtlHintWithContinuousJoin() {
+String sql =
+"select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d', 'T1' = '1d') 
*/* from T1, T2, T3 where T1.a1 = T2.a2 and T2.b2 = T3.b3";
+verify(sql);
+}
+
+@Test
+void testJoinStateTtlHintWithMultiLevelJoin() {
+String sql =
+"select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d', 'T1' = '1d') 
*/* from T1 "
++ "join (select T2.* from T2 join T3 on T2.b2 = T3.b3) 
TMP on T1.a1 = TMP.b2";
+assertThatThrownBy(() -> verify(sql))
+.isInstanceOf(ValidationException.class)
+.hasMessageContaining(
+"The options of following hints cannot match the name 
of input tables or views: \n`%s` in `%s`",
+"T2, T3", "STATE_TTL");
+}
+
+@Test
+void testJoinStateTtlHintWithOneUnknownTable() {
+String sql =
+"select /*+ STATE_TTL('T5' = '2d', 'T1' = '1d') */* from T1 
join T2 on T1.a1 = T2.a2";
+
+assertThatThrownBy(() -> verify(sql))
+.isInstanceOf(ValidationException.class)
+.hasMessageContaining(
+"The options of following hints cannot match the name 
of input tables or views: \n`%s` in `%s`",
+

[jira] [Commented] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.

2024-01-08 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804577#comment-17804577
 ] 

Benchao Li commented on FLINK-33996:


bq. In addition, what I would like to ask is whether the current optimization 
rule will definitely make the same project rewrite into the same cal node, at 
least in my actual tests, this is the behavior.

[~hackergin] It's decided by the cost-based planner whether to merge it or not, 
rules are only suggesting alternatives of the final results.

> Support disabling project rewrite when multiple exprs in the project 
> reference the same sub project field.
> --
>
> Key: FLINK-33996
> URL: https://issues.apache.org/jira/browse/FLINK-33996
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Priority: Major
>  Labels: pull-request-available
>
> When multiple top projects reference the same bottom project, project rewrite 
> rules may result in complex projects being calculated multiple times.
> Take the following SQL as an example:
> {code:sql}
> create table test_source(a varchar) with ('connector'='datagen');
> explain plan for select a || 'a' as a, a || 'b' as b FROM (select 
> REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
> {code}
> The final SQL plan is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
> ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> {code}
> It can be observed that after project write, regex_place is calculated twice. 
> Generally speaking, regular expression matching is a time-consuming operation 
> and we usually do not want it to be calculated multiple times. Therefore, for 
> this scenario, we can support disabling project rewrite.
> After disabling some rules, the final plan we obtained is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> {code}
> After testing, we probably need to modify these few rules:
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31481][table] Support enhanced show databases syntax [flink]

2024-01-08 Thread via GitHub


jeyhunkarimov commented on PR #23612:
URL: https://github.com/apache/flink/pull/23612#issuecomment-1882424567

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-08 Thread via GitHub


1996fanrui commented on PR #24025:
URL: https://github.com/apache/flink/pull/24025#issuecomment-1882410385

   Hi @JunRuiLee , would you mind rebaseing the master branch to fix the CI? 
thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34032) Cleanup local-recovery dir when switching local-recovery from enabled to disabled

2024-01-08 Thread Jinzhong Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804570#comment-17804570
 ] 

Jinzhong Li commented on FLINK-34032:
-

Hi [~masteryhx] , would you please clarify the specific scenario of this issue?

IIUC, in most case, when switching local-recovery from enabled to disabled, 
user usually needs to restart the job. 
And the local-recovery dir will be cleaned up when k8s pod or yarn container 
are killed during job restart.

> Cleanup local-recovery dir when switching local-recovery from enabled to 
> disabled
> -
>
> Key: FLINK-34032
> URL: https://issues.apache.org/jira/browse/FLINK-34032
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Priority: Major
>
> When switching local-recovery from enabled to disabled, the local-recovery 
> dir could not be cleaned.
> In particular, for a job that switched multiple times, lots of historical 
> local checkpoints will be retained forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Extract common preparing directory code into one function [flink-benchmarks]

2024-01-08 Thread via GitHub


masteryhx closed pull request #84: [hotfix] Extract common preparing directory 
code into one function
URL: https://github.com/apache/flink-benchmarks/pull/84


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Extract common preparing directory code into one function [flink-benchmarks]

2024-01-08 Thread via GitHub


masteryhx commented on PR #84:
URL: https://github.com/apache/flink-benchmarks/pull/84#issuecomment-1882394452

   merged 8b0b1c62 into master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33950) Update max aggregate functions to new type system

2024-01-08 Thread Jacky Lau (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804569#comment-17804569
 ] 

Jacky Lau commented on FLINK-33950:
---

hi [~martijnvisser] what is your opinion?

> Update max aggregate functions to new type system
> -
>
> Key: FLINK-33950
> URL: https://issues.apache.org/jira/browse/FLINK-33950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34033) flink json supports raw type

2024-01-08 Thread Jacky Lau (Jira)
Jacky Lau created FLINK-34033:
-

 Summary: flink json supports raw type 
 Key: FLINK-34033
 URL: https://issues.apache.org/jira/browse/FLINK-34033
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.19.0
Reporter: Jacky Lau
 Fix For: 1.19.0


when user use es type nested/object, user can using complex type like 
ROW/ARRAY/MAP.

but it will not convenient when the object type is not fixed size like ROW. for 
example, user my user udf to produce those data and insert to es. we can 
supports RAW type
{code:java}
CREATE TABLE es_sink (
 `string` VARCHAR, 
  nested RAW('java.lang.Object', 
'AEdvcmcuYXBhY2hlLmZsaW5rLmFwaS5qYXZhLnR5cGV1dGlscy5ydW50aW1lLmtyeW8uS3J5b1NlcmlhbGl6ZXJTbmFwc2hvdAIAEGphdmEubGFuZy5PYmplY3QAAATyxpo9cAIAEGphdmEubGFuZy5PYmplY3QBEgAQamF2YS5sYW5nLk9iamVjdAEWABBqYXZhLmxhbmcuT2JqZWN0AAApb3JnLmFwYWNoZS5hdnJvLmdlbmVyaWMuR2VuZXJpY0RhdGEkQXJyYXkBKwApb3JnLmFwYWNoZS5hdnJvLmdlbmVyaWMuR2VuZXJpY0RhdGEkQXJyYXkBtgBVb3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMucnVudGltZS5rcnlvLlNlcmlhbGl6ZXJzJER1bW15QXZyb1JlZ2lzdGVyZWRDbGFzcwEAWW9yZy5hcGFjaGUuZmxpbmsuYXBpLmphdmEudHlwZXV0aWxzLnJ1bnRpbWUua3J5by5TZXJpYWxpemVycyREdW1teUF2cm9LcnlvU2VyaWFsaXplckNsYXNzAAAE8saaPXAE8saaPXAA'),
  object RAW('java.lang.Object', 
'AEdvcmcuYXBhY2hlLmZsaW5rLmFwaS5qYXZhLnR5cGV1dGlscy5ydW50aW1lLmtyeW8uS3J5b1NlcmlhbGl6ZXJTbmFwc2hvdAIAEGphdmEubGFuZy5PYmplY3QAAATyxpo9cAIAEGphdmEubGFuZy5PYmplY3QBEgAQamF2YS5sYW5nLk9iamVjdAEWABBqYXZhLmxhbmcuT2JqZWN0AAApb3JnLmFwYWNoZS5hdnJvLmdlbmVyaWMuR2VuZXJpY0RhdGEkQXJyYXkBKwApb3JnLmFwYWNoZS5hdnJvLmdlbmVyaWMuR2VuZXJpY0RhdGEkQXJyYXkBtgBVb3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMucnVudGltZS5rcnlvLlNlcmlhbGl6ZXJzJER1bW15QXZyb1JlZ2lzdGVyZWRDbGFzcwEAWW9yZy5hcGFjaGUuZmxpbmsuYXBpLmphdmEudHlwZXV0aWxzLnJ1bnRpbWUua3J5by5TZXJpYWxpemVycyREdW1teUF2cm9LcnlvU2VyaWFsaXplckNsYXNzAAAE8saaPXAE8saaPXAA'),
  PRIMARY KEY (`string`) NOT ENFORCED
) WITH
('connector'='elasticsearch'); {code}
and es is dependent on flink-json currently, so we can make flink-json supports 
RAW type



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-32978) Deprecate RichFunction#open(Configuration parameters)

2024-01-08 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reopened FLINK-32978:
--

Reopening the ticket to fix the unexpected breaking changes.

> Deprecate RichFunction#open(Configuration parameters)
> -
>
> Key: FLINK-32978
> URL: https://issues.apache.org/jira/browse/FLINK-32978
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Core
>Affects Versions: 1.19.0
>Reporter: Wencong Liu
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> The 
> [FLIP-344|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231]
>  has decided that the parameter in RichFunction#open will be removed in the 
> next major version. We should deprecate it now and remove it in Flink 2.0. 
> The removal will be tracked in 
> [FLINK-6912|https://issues.apache.org/jira/browse/FLINK-6912].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32978) Deprecate RichFunction#open(Configuration parameters)

2024-01-08 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804566#comment-17804566
 ] 

Xintong Song commented on FLINK-32978:
--

Thanks for reporting this, [~Sergey Nuyanzin].

You're right, this is indeed a problem.

I think the interface change itself is non-breaking. The problem is that we 
also migrated the built-in implementations from the old interface to the new 
one. That should be fine for internal classes, but would become a breaking 
change for @Public / @PublicEvolving classes which might be overridden by user 
codes.

[~Wencong Liu], could you please look into this?

> Deprecate RichFunction#open(Configuration parameters)
> -
>
> Key: FLINK-32978
> URL: https://issues.apache.org/jira/browse/FLINK-32978
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Core
>Affects Versions: 1.19.0
>Reporter: Wencong Liu
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> The 
> [FLIP-344|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231]
>  has decided that the parameter in RichFunction#open will be removed in the 
> next major version. We should deprecate it now and remove it in Flink 2.0. 
> The removal will be tracked in 
> [FLINK-6912|https://issues.apache.org/jira/browse/FLINK-6912].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-08 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804564#comment-17804564
 ] 

Xintong Song commented on FLINK-33728:
--

Thanks for pulling me in.

I'm also concerned about solely relying on heartbeat timeout to detecting pod 
failure. In addition the cleaning-up issue, it can also delay the detection of 
pod failure in many cases.

IIUC, the problem we are trying to solve here is to avoid massive Flink jobs 
trying to re-creating watches at the same time. That doesn't necessarily result 
in the proposed solution.
1. I think this is not a problem of individual Flink jobs, but a problem of the 
K8s cluster that runs massive Flink workloads. Ideally, such problems, i.e. how 
to better deal with the massive workloads, should be solved on the K8s cluster 
side. However, I don't have the expertise to come up with a cluster-side 
solution.
2. If 1) is not feasible, I think we can introduce a random backoff. User may 
configure a max backoff time (default 0), and Flink randomly pick a time that 
is no greater than the max to re-create the watch. Ideally, that would spread 
the pressure on API server over a longer and configurable period.

WDYT?

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]

2024-01-08 Thread via GitHub


LadyForest commented on code in PR #23752:
URL: https://github.com/apache/flink/pull/23752#discussion_r1445602009


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala:
##
@@ -116,13 +123,27 @@ class StreamPhysicalJoin(
   }
 
   override def translateToExecNode(): ExecNode[_] = {
+val stateTtlFromHint = new util.HashMap[JInt, JLong]
+getHints
+  .filter(hint => StateTtlHint.isStateTtlHint(hint.hintName))
+  .forEach {
+hint =>
+  hint.kvOptions.forEach(
+(input, ttl) =>
+  stateTtlFromHint
+.put(
+  if (input == FlinkHints.LEFT_INPUT) 0 else 1,

Review Comment:
   > In the explain syntax, if the user uses "1d" in the state ttl hint, I 
think it is easier to understand by displaying "1d" than by displaying 
"8640 ms".
   
   The key lies in whether the explained digest requires a normalized format of 
everything. If there's no explicit constraint, I'm ok with leaving it unchanged.
   
   > The redundancy of the code is only about the 
"TimeUtils.parseDuration(ttl).toMillis" section, which I think is tolerable. 
Otherwise, based on the first consideration, we will add 
"TimeUtils.parseDuration(ttl)" to explain, which will also cause some 
redundancy.
   
   Actually, I was suggesting adding a normalizer, like 
`CapitalizeQueryHintsShutle`, instead of formatting it in the `RelWriterImpl`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]

2024-01-08 Thread via GitHub


LadyForest commented on code in PR #23752:
URL: https://github.com/apache/flink/pull/23752#discussion_r1445599937


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java:
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.hints.stream;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.utils.PlanKind;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import scala.Enumeration;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link org.apache.flink.table.planner.hint.StateTtlHint}. */
+class StateTtlHintTest extends TableTestBase {
+
+protected StreamTableTestUtil util;
+
+@BeforeEach
+void before() {
+util = streamTestUtil(TableConfig.getDefault());
+util.tableEnv()
+.executeSql(
+"CREATE TABLE T1 (\n"
++ "  a1 BIGINT,\n"
++ "  b1 VARCHAR\n"
++ ") WITH (\n"
++ " 'connector' = 'values'\n"
++ ")");
+util.tableEnv()
+.executeSql(
+"CREATE TABLE T2 (\n"
++ "  a2 BIGINT,\n"
++ "  b2 VARCHAR\n"
++ ") WITH (\n"
++ " 'connector' = 'values'\n"
++ ")");
+
+util.tableEnv()
+.executeSql(
+"CREATE TABLE T3 (\n"
++ "  a3 BIGINT,\n"
++ "  b3 VARCHAR\n"
++ ") WITH (\n"
++ " 'connector' = 'values'\n"
++ ")");
+
+util.tableEnv().executeSql("CREATE View V4 as select a3 as a4, b3 as 
b4 from T3");
+
+util.tableEnv()
+.executeSql("create view V5 as select T1.* from T1 join T2 on 
T1.a1 = T2.a2");
+}
+
+@Test
+void testSimpleJoinStateTtlHintWithEachSide() {
+String sql =
+"select /*+ STATE_TTL('T2' = '2d', 'T1' = '1d') */* from T1 
join T2 on T1.a1 = T2.a2";
+verify(sql);
+}
+
+@Test
+void testJoinStateTtlHintOnlyWithRightSide() {
+String sql = "select /*+ STATE_TTL('T2' = '2d') */* from T1 join T2 on 
T1.a1 = T2.a2";
+verify(sql);
+}
+
+@Test
+void testJoinStateTtlHintWithContinuousJoin() {
+String sql =
+"select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d', 'T1' = '1d') 
*/* from T1, T2, T3 where T1.a1 = T2.a2 and T2.b2 = T3.b3";
+verify(sql);
+}
+
+@Test
+void testJoinStateTtlHintWithMultiLevelJoin() {
+String sql =
+"select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d', 'T1' = '1d') 
*/* from T1 "
++ "join (select T2.* from T2 join T3 on T2.b2 = T3.b3) 
TMP on T1.a1 = TMP.b2";
+assertThatThrownBy(() -> verify(sql))
+.isInstanceOf(ValidationException.class)
+.hasMessageContaining(
+"The options of following hints cannot match the name 
of input tables or views: \n`%s` in `%s`",
+"T2, T3", "STATE_TTL");
+}
+
+@Test
+void testJoinStateTtlHintWithOneUnknownTable() {
+String sql =
+"select /*+ STATE_TTL('T5' = '2d', 'T1' = '1d') */* from T1 
join T2 on T1.a1 = T2.a2";
+
+assertThatThrownBy(() -> verify(sql))
+.isInstanceOf(ValidationException.class)
+.hasMessageContaining(
+"The options of following hints cannot match the name 
of input tables or views: \n`%s` in `%s`",
+

Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]

2024-01-08 Thread via GitHub


mas-chen commented on PR #44:
URL: 
https://github.com/apache/flink-connector-kafka/pull/44#issuecomment-1882369061

   @tzulitai Thanks for the review. I would like get some more input from at 
least another reviewer from merging this and I agree this should be included in 
the 3.1.0 release. Perhaps @mxm?
   
   After this PR, I still need to complete the documentation for this new 
connector.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]

2024-01-08 Thread via GitHub


mas-chen commented on code in PR #44:
URL: 
https://github.com/apache/flink-connector-kafka/pull/44#discussion_r1445596890


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/DynamicKafkaSource.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import 
org.apache.flink.connector.kafka.source.enumerator.DynamicKafkaSourceEnumState;
+import 
org.apache.flink.connector.kafka.source.enumerator.DynamicKafkaSourceEnumStateSerializer;
+import 
org.apache.flink.connector.kafka.source.enumerator.DynamicKafkaSourceEnumerator;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.enumerator.metadata.KafkaMetadataService;
+import 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaStreamSubscriber;
+import org.apache.flink.connector.kafka.source.reader.DynamicKafkaSourceReader;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connector.kafka.source.split.DynamicKafkaSourceSplit;
+import 
org.apache.flink.connector.kafka.source.split.DynamicKafkaSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Properties;
+
+/**
+ * Factory class for the DynamicKafkaSource components. https://cwiki.apache.org/confluence/x/CBn1D;>FLIP-246: 
DynamicKafkaSource
+ *
+ * This source's key difference from {@link KafkaSource} is that it enables 
users to read
+ * dynamically from topics, which belong to one or more clusters.
+ *
+ * @param  Record type
+ */
+@PublicEvolving
+public class DynamicKafkaSource

Review Comment:
   Yes, I was a bit lazy there, since it required the diagrams to change. Let 
me revisit that now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34032) Cleanup local-recovery dir when switching local-recovery from enabled to disabled

2024-01-08 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-34032:
-
Summary: Cleanup local-recovery dir when switching local-recovery from 
enabled to disabled  (was: Cleanup local=recovery dir when switching 
local-recovery from enabled to disabled)

> Cleanup local-recovery dir when switching local-recovery from enabled to 
> disabled
> -
>
> Key: FLINK-34032
> URL: https://issues.apache.org/jira/browse/FLINK-34032
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Priority: Major
>
> When switching local-recovery from enabled to disabled, the local-recovery 
> dir could not be cleaned.
> In particular, for a job that switched multiple times, lots of historical 
> local checkpoints will be retained forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34032) Cleanup local=recovery dir when switching local-recovery from enabled to disabled

2024-01-08 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-34032:


 Summary: Cleanup local=recovery dir when switching local-recovery 
from enabled to disabled
 Key: FLINK-34032
 URL: https://issues.apache.org/jira/browse/FLINK-34032
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Hangxiang Yu


When switching local-recovery from enabled to disabled, the local-recovery dir 
could not be cleaned.

In particular, for a job that switched multiple times, lots of historical local 
checkpoints will be retained forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34031) Hive table sources report statistics in various formats

2024-01-08 Thread hanjie (Jira)
hanjie created FLINK-34031:
--

 Summary: Hive table sources report statistics in various formats
 Key: FLINK-34031
 URL: https://issues.apache.org/jira/browse/FLINK-34031
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: hanjie


Now for hive table source, reporting statistics only support Orc and Parquet 
formats.

Currently,  we have some text format hive table. Somewhile text hive table as 
dimension table, task should use broadcast join, but text format table cannot 
obtain table stats. 

So, hive table sources report statistics in various formats, such  as `text`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34031) Hive table sources report statistics in various formats

2024-01-08 Thread hanjie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hanjie updated FLINK-34031:
---
Component/s: Connectors / Hive
 (was: Table SQL / API)

> Hive table sources report statistics in various formats
> ---
>
> Key: FLINK-34031
> URL: https://issues.apache.org/jira/browse/FLINK-34031
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Reporter: hanjie
>Priority: Major
>
> Now for hive table source, reporting statistics only support Orc and Parquet 
> formats.
> Currently,  we have some text format hive table. Somewhile text hive table as 
> dimension table, task should use broadcast join, but text format table cannot 
> obtain table stats. 
> So, hive table sources report statistics in various formats, such  as `text`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]

2024-01-08 Thread via GitHub


zoudan commented on PR #23984:
URL: https://github.com/apache/flink/pull/23984#issuecomment-1882328998

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]

2024-01-08 Thread via GitHub


zoudan commented on code in PR #23984:
URL: https://github.com/apache/flink/pull/23984#discussion_r1445558740


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala:
##
@@ -87,10 +90,11 @@ object LongHashJoinGenerator {
   def genProjection(
   tableConfig: ReadableConfig,
   classLoader: ClassLoader,
-  types: Array[LogicalType]): GeneratedProjection = {
+  types: Array[LogicalType],
+  parentCtx: CodeGeneratorContext): GeneratedProjection = {

Review Comment:
   Sorry, I do not get your point, I think `tableConfig` and `classLoader` are 
needed in this method. And I only add the parameter `parentCtx` without change 
anything else.



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala:
##
@@ -124,14 +124,27 @@ object CodeGenUtils {
 
   private val nameCounter = new AtomicLong
 
-  def newName(name: String): String = {
-s"$name$$${nameCounter.getAndIncrement}"
+  def newName(context: CodeGeneratorContext, name: String): String = {
+if (context == null || context.getNameCounter == null) {
+  // Add an 'i' in the middle to distinguish from nameCounter in 
CodeGeneratorContext
+  // and avoid naming conflicts.
+  s"$name$$i${nameCounter.getAndIncrement}"
+} else {
+  s"$name$$${context.getNameCounter.getAndIncrement}"
+}
   }
 
-  def newNames(names: String*): Seq[String] = {
+  def newNames(context: CodeGeneratorContext, names: String*): Seq[String] = {
 require(names.toSet.size == names.length, "Duplicated names")
-val newId = nameCounter.getAndIncrement
-names.map(name => s"$name$$$newId")
+if (context == null || context.getNameCounter == null) {
+  val newId = nameCounter.getAndIncrement
+  // Add an 'i' in the middle to distinguish from nameCounter in 
CodeGeneratorContext

Review Comment:
There may be some scenarios where it is not convenient for us to obtain 
class level CodeGeneratorContext, and we could use the nameCounter in 
`CodeGenUtils` to generate new names. In these cases we may use nameCounter 
from `CodeGenUtils` and `CodeGeneratorContext` in the same class.



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java:
##
@@ -88,7 +91,16 @@ public ZoneId getSessionZoneId() {
 public ClassLoader getClassLoader() {
 return classLoader;
 }
+
+@Override
+@Nullable
+public CodeGeneratorContext getCodeGeneratorContext() {
+return null;

Review Comment:
   Change it to non null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.

2024-01-08 Thread Feng Jin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804557#comment-17804557
 ] 

Feng Jin commented on FLINK-33996:
--

[~libenchao] [~qingyue]  If it can be reused during the codegen phase, then it 
is indeed a better approach.  

In addition, what I would like to ask is whether the current optimization rule 
will definitely make the same project rewrite into the same cal node, at least 
in my actual tests, this is the behavior. otherwise it may not be able to reuse 
in the codegen phase.

> Support disabling project rewrite when multiple exprs in the project 
> reference the same sub project field.
> --
>
> Key: FLINK-33996
> URL: https://issues.apache.org/jira/browse/FLINK-33996
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Priority: Major
>  Labels: pull-request-available
>
> When multiple top projects reference the same bottom project, project rewrite 
> rules may result in complex projects being calculated multiple times.
> Take the following SQL as an example:
> {code:sql}
> create table test_source(a varchar) with ('connector'='datagen');
> explan plan for select a || 'a' as a, a || 'b' as b FROM (select 
> REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
> {code}
> The final SQL plan is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
> ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> {code}
> It can be observed that after project write, regex_place is calculated twice. 
> Generally speaking, regular expression matching is a time-consuming operation 
> and we usually do not want it to be calculated multiple times. Therefore, for 
> this scenario, we can support disabling project rewrite.
> After disabling some rules, the final plan we obtained is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> {code}
> After testing, we probably need to modify these few rules:
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.

2024-01-08 Thread Feng Jin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jin updated FLINK-33996:
-
Description: 
When multiple top projects reference the same bottom project, project rewrite 
rules may result in complex projects being calculated multiple times.

Take the following SQL as an example:
{code:sql}
create table test_source(a varchar) with ('connector'='datagen');

explain plan for select a || 'a' as a, a || 'b' as b FROM (select 
REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
{code}
The final SQL plan is as follows:
{code:sql}
== Abstract Syntax Tree ==
LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
+- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
   +- LogicalTableScan(table=[[default_catalog, default_database, test_source]])

== Optimized Physical Plan ==
Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
_UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
_UTF-16LE'b') AS b])
+- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])

== Optimized Execution Plan ==
Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
+- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])
{code}
It can be observed that after project write, regex_place is calculated twice. 
Generally speaking, regular expression matching is a time-consuming operation 
and we usually do not want it to be calculated multiple times. Therefore, for 
this scenario, we can support disabling project rewrite.

After disabling some rules, the final plan we obtained is as follows:
{code:sql}
== Abstract Syntax Tree ==
LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
+- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
   +- LogicalTableScan(table=[[default_catalog, default_database, test_source]])

== Optimized Physical Plan ==
Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
+- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
   +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])

== Optimized Execution Plan ==
Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b])
+- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a])
   +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])
{code}
After testing, we probably need to modify these few rules:

org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule

org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule

org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule

  was:
When multiple top projects reference the same bottom project, project rewrite 
rules may result in complex projects being calculated multiple times.

Take the following SQL as an example:
{code:sql}
create table test_source(a varchar) with ('connector'='datagen');

explan plan for select a || 'a' as a, a || 'b' as b FROM (select 
REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
{code}
The final SQL plan is as follows:
{code:sql}
== Abstract Syntax Tree ==
LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
+- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
   +- LogicalTableScan(table=[[default_catalog, default_database, test_source]])

== Optimized Physical Plan ==
Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
_UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
_UTF-16LE'b') AS b])
+- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])

== Optimized Execution Plan ==
Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
+- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])
{code}
It can be observed that after project write, regex_place is calculated twice. 
Generally speaking, regular expression matching is a time-consuming operation 
and we usually do not want it to be calculated multiple times. Therefore, for 
this scenario, we can support disabling project rewrite.

After disabling some rules, the final plan we obtained is as follows:
{code:sql}
== Abstract Syntax Tree ==
LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
+- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
   +- LogicalTableScan(table=[[default_catalog, default_database, test_source]])

== Optimized Physical Plan ==
Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
+- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
   +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])

== Optimized Execution Plan ==
Calc(select=[||(a, 'a') AS a, ||(a, 

Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-08 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1445576114


##
pom.xml:
##
@@ -2301,6 +2301,8 @@ under the License.

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.lang.Class,java.lang.Object[])

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection,org.apache.flink.api.common.typeinfo.TypeInformation)
+   
org.apache.flink.api.common.functions.RuntimeContext
+   
org.apache.flink.api.connector.source.SourceReaderContext

Review Comment:
   I have merged these changes to the commit where it should be.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-33879) Hybrid Shuffle may hang during redistribution

2024-01-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-33879:
--

Assignee: Jiang Xin

> Hybrid Shuffle may hang during redistribution
> -
>
> Key: FLINK-33879
> URL: https://issues.apache.org/jira/browse/FLINK-33879
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Reporter: Jiang Xin
>Assignee: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, the Hybrid Shuffle can work with the memory tier and disk tier 
> together, however, in the following scenario the result partition would stop 
> working.
> Suppose we have a shuffle task with 2 sub-partitions. The LocalBufferPool has 
> 15 buffers, the memory tier can use at most 15-(2*(2+1)+1) = 8 buffers 
> according to `TieredStorageMemoryManagerImpl#getMaxNonReclaimableBuffers`. If 
> the memory tier uses up all 8 buffers and the input channel doesn't consume 
> them because of some problem, the disk tier can still work with 1 reserved 
> buffer. However, if a redistribution happens now and the pool size is 
> decreased to less than 8, then the BufferAccumulator can not request buffers 
> anymore, and thus the result partition stops working as well.
> The purpose is to make the result partition still work with the disk tier and 
> write the shuffle data to disk so that once the input channel is ready, the 
> data on the disk can be consumed immediately



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33938][runtime-web] Correct implicit coercions in relational operators to adopt typescript 5.0 [flink]

2024-01-08 Thread via GitHub


WencongLiu commented on PR #23999:
URL: https://github.com/apache/flink/pull/23999#issuecomment-1882250920

   These two modifications prevent the code from relying on implicit type 
conversions. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-08 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804553#comment-17804553
 ] 

xiaogang zhou commented on FLINK-33728:
---

[~mapohl] I think your concern is really very important. I think my statement 
is not good enough. After your reminder, I'd like to change it to :

 

We can just neglect the disconnection of watching process {color:#FF}if 
there is no pending request{color}. and try to rewatch once new requestResource 
called.

 

And we can choose to fail all CompletableFuture And the 
[requestWorkerIfRequired|https://github.com/apache/flink/blob/2b9b9859253698c3c90ca420f10975e27e6c52d4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java#L332]
 will request the resource again, this will trigger the rewatch.

 

WDYT [~mapohl] [~xtsong] 

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]

2024-01-08 Thread via GitHub


xuyangzhong commented on code in PR #23752:
URL: https://github.com/apache/flink/pull/23752#discussion_r1445560825


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java:
##
@@ -43,6 +44,57 @@ class ConfigureOperatorLevelStateTtlJsonITCase extends 
JsonPlanTestBase {
 
 @Test
 void testDifferentStateTtlForDifferentOneInputOperator() throws Exception {
+innerTestDeduplicateAndGroupAggregate(

Review Comment:
   Sorry, I'll add it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]

2024-01-08 Thread via GitHub


xuyangzhong commented on code in PR #23752:
URL: https://github.com/apache/flink/pull/23752#discussion_r1445559559


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java:
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.hints.stream;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.utils.PlanKind;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import scala.Enumeration;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link org.apache.flink.table.planner.hint.StateTtlHint}. */
+class StateTtlHintTest extends TableTestBase {
+
+protected StreamTableTestUtil util;
+
+@BeforeEach
+void before() {
+util = streamTestUtil(TableConfig.getDefault());
+util.tableEnv()
+.executeSql(
+"CREATE TABLE T1 (\n"
++ "  a1 BIGINT,\n"
++ "  b1 VARCHAR\n"
++ ") WITH (\n"
++ " 'connector' = 'values'\n"
++ ")");
+util.tableEnv()
+.executeSql(
+"CREATE TABLE T2 (\n"
++ "  a2 BIGINT,\n"
++ "  b2 VARCHAR\n"
++ ") WITH (\n"
++ " 'connector' = 'values'\n"
++ ")");
+
+util.tableEnv()
+.executeSql(
+"CREATE TABLE T3 (\n"
++ "  a3 BIGINT,\n"
++ "  b3 VARCHAR\n"
++ ") WITH (\n"
++ " 'connector' = 'values'\n"
++ ")");
+
+util.tableEnv().executeSql("CREATE View V4 as select a3 as a4, b3 as 
b4 from T3");
+
+util.tableEnv()
+.executeSql("create view V5 as select T1.* from T1 join T2 on 
T1.a1 = T2.a2");
+}
+
+@Test
+void testSimpleJoinStateTtlHintWithEachSide() {
+String sql =
+"select /*+ STATE_TTL('T2' = '2d', 'T1' = '1d') */* from T1 
join T2 on T1.a1 = T2.a2";
+verify(sql);
+}
+
+@Test
+void testJoinStateTtlHintOnlyWithRightSide() {
+String sql = "select /*+ STATE_TTL('T2' = '2d') */* from T1 join T2 on 
T1.a1 = T2.a2";
+verify(sql);
+}
+
+@Test
+void testJoinStateTtlHintWithContinuousJoin() {
+String sql =
+"select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d', 'T1' = '1d') 
*/* from T1, T2, T3 where T1.a1 = T2.a2 and T2.b2 = T3.b3";
+verify(sql);
+}
+
+@Test
+void testJoinStateTtlHintWithMultiLevelJoin() {
+String sql =
+"select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d', 'T1' = '1d') 
*/* from T1 "
++ "join (select T2.* from T2 join T3 on T2.b2 = T3.b3) 
TMP on T1.a1 = TMP.b2";
+assertThatThrownBy(() -> verify(sql))
+.isInstanceOf(ValidationException.class)
+.hasMessageContaining(
+"The options of following hints cannot match the name 
of input tables or views: \n`%s` in `%s`",
+"T2, T3", "STATE_TTL");
+}
+
+@Test
+void testJoinStateTtlHintWithOneUnknownTable() {
+String sql =
+"select /*+ STATE_TTL('T5' = '2d', 'T1' = '1d') */* from T1 
join T2 on T1.a1 = T2.a2";
+
+assertThatThrownBy(() -> verify(sql))
+.isInstanceOf(ValidationException.class)
+.hasMessageContaining(
+"The options of following hints cannot match the name 
of input tables or views: \n`%s` in `%s`",
+   

Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]

2024-01-08 Thread via GitHub


xuyangzhong commented on code in PR #23752:
URL: https://github.com/apache/flink/pull/23752#discussion_r1445548152


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala:
##
@@ -116,13 +123,27 @@ class StreamPhysicalJoin(
   }
 
   override def translateToExecNode(): ExecNode[_] = {
+val stateTtlFromHint = new util.HashMap[JInt, JLong]
+getHints
+  .filter(hint => StateTtlHint.isStateTtlHint(hint.hintName))
+  .forEach {
+hint =>
+  hint.kvOptions.forEach(
+(input, ttl) =>
+  stateTtlFromHint
+.put(
+  if (input == FlinkHints.LEFT_INPUT) 0 else 1,

Review Comment:
   I have some different ideas here. 
   1. In the explain syntax, if the user uses "1d" in the state ttl hint, I 
think it is easier to understand by displaying "1d" than by displaying 
"8640 ms". 
   3. The redundancy of the code is only about the 
"TimeUtils.parseDuration(ttl).toMillis" section, which I think is tolerable. 
Otherwise, based on the first consideration, we will add 
"TimeUtils.parseDuration(ttl)" to explain, which will also cause some 
redundancy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34030][changelog] Introduce periodic-materialize.enabled to enable/disable periodic materialization [flink]

2024-01-08 Thread via GitHub


flinkbot commented on PR #24046:
URL: https://github.com/apache/flink/pull/24046#issuecomment-1882187313

   
   ## CI report:
   
   * ebd87ccef19fb417e320bba6b9c2e54cf4d94b52 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34030][changelog] Introduce periodic-materialize.enabled to enable/disable periodic materialization [flink]

2024-01-08 Thread via GitHub


masteryhx commented on PR #24046:
URL: https://github.com/apache/flink/pull/24046#issuecomment-1882178534

   @JunRuiLee @fredia Could you help to take a review ? Thanks a lot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34030) Avoid using negative value for periodic-materialize.interval

2024-01-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34030:
---
Labels: pull-request-available  (was: )

> Avoid using negative value for periodic-materialize.interval
> 
>
> Key: FLINK-34030
> URL: https://issues.apache.org/jira/browse/FLINK-34030
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: pull-request-available
>
> Similar to FLINK-32023, a nagative value doesn't work for Duration Type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34030][changelog] Introduce periodic-materialize.enabled to enable/disable periodic materialization [flink]

2024-01-08 Thread via GitHub


masteryhx opened a new pull request, #24046:
URL: https://github.com/apache/flink/pull/24046

   
   
   ## What is the purpose of the change
   
   Similar to [FLINK-32023](https://issues.apache.org/jira/browse/FLINK-32023), 
a nagative value doesn't work for Duration Type like 
periodic-materialize.interval
   
   ## Brief change log
   
 - *Introduce periodic-materialize.enabled to enable/disable periodic 
materialization*
 - *Replace all nagative cases about periodic-materialize.interval with 
periodic-materialize.enabled*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added StreamExecutionEnvironmentTest#testPeriodicMaterializeEnabled and 
StreamExecutionEnvironmentTest#testPeriodicMaterializeInterval*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes, but just introduce a new option whose default 
behavious is same like before
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? docs 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-34013) ProfilingServiceTest.testRollingDeletion is unstable on AZP

2024-01-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-34013:


Assignee: Yu Chen  (was: Yu Chen)

> ProfilingServiceTest.testRollingDeletion is unstable on AZP
> ---
>
> Key: FLINK-34013
> URL: https://issues.apache.org/jira/browse/FLINK-34013
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.19.0
>Reporter: Sergey Nuyanzin
>Assignee: Yu Chen
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56073=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8258
>  fails as 
> {noformat}
> Jan 06 02:09:28 org.opentest4j.AssertionFailedError: expected: <2> but was: 
> <3>
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
> Jan 06 02:09:28   at 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest.verifyRollingDeletionWorks(ProfilingServiceTest.java:167)
> Jan 06 02:09:28   at 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion(ProfilingServiceTest.java:117)
> Jan 06 02:09:28   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 06 02:09:28   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Jan 06 02:09:28   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Jan 06 02:09:28   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Jan 06 02:09:28   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Jan 06 02:09:28   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34013) ProfilingServiceTest.testRollingDeletion is unstable on AZP

2024-01-08 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-34013:


Assignee: Yu Chen

> ProfilingServiceTest.testRollingDeletion is unstable on AZP
> ---
>
> Key: FLINK-34013
> URL: https://issues.apache.org/jira/browse/FLINK-34013
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.19.0
>Reporter: Sergey Nuyanzin
>Assignee: Yu Chen
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56073=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8258
>  fails as 
> {noformat}
> Jan 06 02:09:28 org.opentest4j.AssertionFailedError: expected: <2> but was: 
> <3>
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
> Jan 06 02:09:28   at 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest.verifyRollingDeletionWorks(ProfilingServiceTest.java:167)
> Jan 06 02:09:28   at 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion(ProfilingServiceTest.java:117)
> Jan 06 02:09:28   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 06 02:09:28   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Jan 06 02:09:28   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Jan 06 02:09:28   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Jan 06 02:09:28   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Jan 06 02:09:28   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]

2024-01-08 Thread via GitHub


xuyangzhong commented on code in PR #23752:
URL: https://github.com/apache/flink/pull/23752#discussion_r1445548152


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala:
##
@@ -116,13 +123,27 @@ class StreamPhysicalJoin(
   }
 
   override def translateToExecNode(): ExecNode[_] = {
+val stateTtlFromHint = new util.HashMap[JInt, JLong]
+getHints
+  .filter(hint => StateTtlHint.isStateTtlHint(hint.hintName))
+  .forEach {
+hint =>
+  hint.kvOptions.forEach(
+(input, ttl) =>
+  stateTtlFromHint
+.put(
+  if (input == FlinkHints.LEFT_INPUT) 0 else 1,

Review Comment:
   I have some different ideas here. 
   1. In the explain syntax, if the user uses "1d" in the state ttl hint, I 
think it is easier to understand by displaying "1d" than by displaying 
"8640 ms". 
   3. The redundancy of the code is only about the 
"TimeUtils.parseDuration(ttl).toMillis" section, which I think is tolerable. 
Otherwise, based on the first consideration, we will add 
"TimeUtils.parseDuration(ttl)" to the explainTerms method, which will also 
cause some redundancy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34030) Avoid using negative value for periodic-materialize.interval

2024-01-08 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-34030:


 Summary: Avoid using negative value for 
periodic-materialize.interval
 Key: FLINK-34030
 URL: https://issues.apache.org/jira/browse/FLINK-34030
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Hangxiang Yu
Assignee: Hangxiang Yu


Similar to FLINK-32023, a nagative value doesn't work for Duration Type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32416] initial implementation of DynamicKafkaSource with bound… [flink-connector-kafka]

2024-01-08 Thread via GitHub


mas-chen commented on code in PR #44:
URL: 
https://github.com/apache/flink-connector-kafka/pull/44#discussion_r1445538272


##
flink-connector-kafka/src/test/resources/log4j2-test.properties:
##
@@ -36,3 +36,12 @@ logger.zookeeper.name = org.apache.zookeeper
 logger.zookeeper.level = OFF
 logger.I0Itec.name = org.I0Itec
 logger.I0Itec.level = OFF
+
+logger.DynamicKafkaSourceReader.name = 
org.apache.flink.connector.kafka.source.reader.DynamicKafkaSourceReader
+logger.DynamicKafkaSourceReader.level = OFF
+
+logger.kafkaSourceReader.name = 
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader
+logger.kafkaSourceReader.level = OFF
+
+logger.SourceReaderBase.name = 
org.apache.flink.connector.base.source.reader.SourceReaderBase
+logger.SourceReaderBase.level = OFF

Review Comment:
   This was for troubleshooting integration tests. I'll remove them now that 
they are stable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33583][table-planner] Support state ttl hint for regular join [flink]

2024-01-08 Thread via GitHub


xuyangzhong commented on code in PR #23752:
URL: https://github.com/apache/flink/pull/23752#discussion_r1445536689


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java:
##
@@ -84,6 +85,12 @@ protected RelNode visitBiRel(BiRel biRel) {
 Set existentKVHints = new HashSet<>();
 
 List oldHints = ((Hintable) biRel).getHints();
+List oldQueryHints = FlinkHints.getAllQueryHints(oldHints);
+// has no hints, return directly.
+if (oldQueryHints.isEmpty()) {
+return super.visitChildren(biRel);
+}
+

Review Comment:
   For example, table hints. Return directly here because the current BiRel may 
not have query hints, so there is no need to process it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34029) Support different profiling mode on Flink WEB

2024-01-08 Thread Yu Chen (Jira)
Yu Chen created FLINK-34029:
---

 Summary: Support different profiling mode on Flink WEB
 Key: FLINK-34029
 URL: https://issues.apache.org/jira/browse/FLINK-34029
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Web Frontend
Affects Versions: 1.19.0
Reporter: Yu Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34013][runtime] Remove duplicated stopProfiling() in `ProfilingService` [flink]

2024-01-08 Thread via GitHub


flinkbot commented on PR #24045:
URL: https://github.com/apache/flink/pull/24045#issuecomment-1882116192

   
   ## CI report:
   
   * 237bddb2c138acf773d79fda697c66ca82896ec2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34013) ProfilingServiceTest.testRollingDeletion is unstable on AZP

2024-01-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34013:
---
Labels: pull-request-available test-stability  (was: test-stability)

> ProfilingServiceTest.testRollingDeletion is unstable on AZP
> ---
>
> Key: FLINK-34013
> URL: https://issues.apache.org/jira/browse/FLINK-34013
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.19.0
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56073=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7c1d86e3-35bd-5fd5-3b7c-30c126a78702=8258
>  fails as 
> {noformat}
> Jan 06 02:09:28 org.opentest4j.AssertionFailedError: expected: <2> but was: 
> <3>
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
> Jan 06 02:09:28   at 
> org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
> Jan 06 02:09:28   at 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest.verifyRollingDeletionWorks(ProfilingServiceTest.java:167)
> Jan 06 02:09:28   at 
> org.apache.flink.runtime.util.profiler.ProfilingServiceTest.testRollingDeletion(ProfilingServiceTest.java:117)
> Jan 06 02:09:28   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 06 02:09:28   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Jan 06 02:09:28   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Jan 06 02:09:28   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Jan 06 02:09:28   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Jan 06 02:09:28   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34013][runtime] Remove duplicated stopProfiling() in `ProfilingService` [flink]

2024-01-08 Thread via GitHub


yuchen-ecnu opened a new pull request, #24045:
URL: https://github.com/apache/flink/pull/24045

   
   ## What is the purpose of the change
   
   Remove the duplicated `stopProfiling()` call in `ProfilingService` to avoid 
profiling instances stopped unexpectedly when repeating profiling quickly.
   
   
   ## Brief change log
   
   - Remove duplicate scheduled stopProfiling task.
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`ProfilingServiceTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34027] Introduces AsyncScalarFunction as a new UDF type [flink]

2024-01-08 Thread via GitHub


AlanConfluent commented on PR #23975:
URL: https://github.com/apache/flink/pull/23975#issuecomment-1882073976

   @twalthr Thanks Timo for the in-depth review!  I think I was able to respond 
to everything you mentioned, no real straggling issues.  Please take another 
look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34027) Create initial PR for FLIP-400

2024-01-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34027:
---
Labels: pull-request-available  (was: )

> Create initial PR for FLIP-400
> --
>
> Key: FLINK-34027
> URL: https://issues.apache.org/jira/browse/FLINK-34027
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Alan Sheinberg
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33611] [flink-protobuf] Support Large Protobuf Schemas [flink]

2024-01-08 Thread via GitHub


sharath1709 commented on PR #23937:
URL: https://github.com/apache/flink/pull/23937#issuecomment-1882015968

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-08 Thread via GitHub


snuyanzin commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1882007369

   @libenchao since you are participated in jira, could you also have a look 
please?
   I'm also asking since now there are commits from both David and me and it 
would be great to have another look here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-24024) Support session Window TVF

2024-01-08 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804517#comment-17804517
 ] 

Sergey Nuyanzin commented on FLINK-24024:
-

[~xuyangzhong] thanks a lot for working on this and really sorry for the delay
I will try to have a look this week

> Support session Window TVF 
> ---
>
> Key: FLINK-24024
> URL: https://issues.apache.org/jira/browse/FLINK-24024
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Jing Zhang
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
>  
>  # Fix calcite syntax  CALCITE-4337
>  # Introduce session window TVF in Flink
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33328] Allow TwoPhaseCommittingSink WithPreCommitTopology to alter the type of the Committable - mixin approach [flink]

2024-01-08 Thread via GitHub


rodmeneses commented on code in PR #23912:
URL: https://github.com/apache/flink/pull/23912#discussion_r1445467435


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java:
##
@@ -21,18 +21,9 @@
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.streaming.api.datastream.DataStream;
 
 /** Allows expert users to implement a custom topology before {@link 
SinkWriter}. */
 @Experimental
-public interface WithPreWriteTopology extends Sink {
-
-/**
- * Adds an arbitrary topology before the writer. The topology may be used 
to repartition the
- * data.
- *
- * @param inputDataStream the stream of input records.
- * @return the custom topology before {@link SinkWriter}.
- */
-DataStream addPreWriteTopology(DataStream inputDataStream);
-}
+@Deprecated

Review Comment:
   we could explain in the javadoc why this interface is deprecated, specially 
since it's a new addition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33328] Allow TwoPhaseCommittingSink WithPreCommitTopology to alter the type of the Committable - mixin approach [flink]

2024-01-08 Thread via GitHub


rodmeneses commented on code in PR #23912:
URL: https://github.com/apache/flink/pull/23912#discussion_r1445464969


##
flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A mixin interface for a {@link Sink} which supports a stateful {@link 
StatefulSinkWriter}.
+ *
+ * The {@link Sink} needs to be serializable. All configuration should be 
validated eagerly. The
+ * respective sink writers are transient and will only be created in the 
subtasks on the
+ * taskmanagers.
+ *
+ * @param  The type of the sink's input
+ * @param  The type of the sink writer's state
+ */
+@PublicEvolving
+public interface SupportsWriterState {
+
+/**
+ * Create a {@link StatefulSinkWriter} from a recovered state.
+ *
+ * @param context the runtime context.

Review Comment:
   nit: Java doc doesn't include second parameter `recoveredState`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33328] Allow TwoPhaseCommittingSink WithPreCommitTopology to alter the type of the Committable - mixin approach [flink]

2024-01-08 Thread via GitHub


rodmeneses commented on code in PR #23912:
URL: https://github.com/apache/flink/pull/23912#discussion_r1445465244


##
flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A mixin interface for a {@link Sink} which supports a stateful {@link 
StatefulSinkWriter}.
+ *
+ * The {@link Sink} needs to be serializable. All configuration should be 
validated eagerly. The
+ * respective sink writers are transient and will only be created in the 
subtasks on the
+ * taskmanagers.
+ *
+ * @param  The type of the sink's input
+ * @param  The type of the sink writer's state
+ */
+@PublicEvolving
+public interface SupportsWriterState {
+
+/**
+ * Create a {@link StatefulSinkWriter} from a recovered state.
+ *
+ * @param context the runtime context.
+ * @return A sink writer.
+ * @throws IOException for any failure during creation.
+ */
+StatefulSinkWriter restoreWriter(
+WriterInitContext context, Collection 
recoveredState) throws IOException;
+
+/**
+ * Any stateful sink needs to provide this state serializer and implement 
{@link
+ * StatefulSinkWriter#snapshotState(long)} properly. The respective state 
is used in {@link
+ * #restoreWriter(WriterInitContext, Collection)} on recovery.
+ *
+ * @return the serializer of the writer's state type.
+ */
+SimpleVersionedSerializer getWriterStateSerializer();
+
+/**
+ * A mix-in for {@link SupportsWriterState} that allows users to migrate 
from a sink with a
+ * compatible state to this sink.
+ */
+@PublicEvolving
+interface WithCompatibleState {
+/**
+ * A list of state names of sinks from which the state can be 
restored. For example, the new

Review Comment:
   nit: `A collection of state...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33328] Allow TwoPhaseCommittingSink WithPreCommitTopology to alter the type of the Committable - mixin approach [flink]

2024-01-08 Thread via GitHub


rodmeneses commented on code in PR #23912:
URL: https://github.com/apache/flink/pull/23912#discussion_r1445463214


##
flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java:
##
@@ -36,49 +34,43 @@
  * @param  The type of the sink writer's state
  */
 @PublicEvolving
-public interface StatefulSink extends Sink {
+@Deprecated

Review Comment:
   we could add reason for deprecation in the java doc of this interface



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33328] Allow TwoPhaseCommittingSink WithPreCommitTopology to alter the type of the Committable - mixin approach [flink]

2024-01-08 Thread via GitHub


rodmeneses commented on code in PR #23912:
URL: https://github.com/apache/flink/pull/23912#discussion_r1445462324


##
flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java:
##
@@ -52,11 +54,30 @@ public interface Sink extends Serializable {
  * @param context the runtime context.
  * @return A sink writer.
  * @throws IOException for any failure during creation.
+ * @deprecated Please implement {@link #createWriter(WriterInitContext)}. 
For backward
+ * compatibility reasons - to keep {@link Sink} a functional interface 
- Flink did not
+ * provide a default implementation. New {@link Sink} implementations 
should implement this
+ * method, but it will not be used, and it will be removed in 1.20.0 
release. Do not use
+ * {@link Override} annotation when implementing this method, to 
prevent compilation errors
+ * when migrating to 1.20.x release.
  */
+@Deprecated

Review Comment:
   do we use the attribute option `since` in other places? it might be useful 
to have that in the annotation itself



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31481][table] Support enhanced show databases syntax [flink]

2024-01-08 Thread via GitHub


jeyhunkarimov commented on PR #23612:
URL: https://github.com/apache/flink/pull/23612#issuecomment-1881965798

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >