[jira] [Updated] (FLINK-31335) using sql-gateway to submit job to yarn cluster, sql-gateway don't support kerberos

2023-03-16 Thread felixzh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

felixzh updated FLINK-31335:

Affects Version/s: 1.16.0
   (was: 1.16.1)

> using sql-gateway to submit job to yarn cluster, sql-gateway don't support 
> kerberos
> ---
>
> Key: FLINK-31335
> URL: https://issues.apache.org/jira/browse/FLINK-31335
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: felixzh
>Priority: Major
>  Labels: pull-request-available
>
> when submit job to yarn cluster, sql-gateway don't support kerberos.
> 1. yarn-per-job mode
> -Dexecution.target=yarn-per-job
> 2. yarn-session mode
> -Dexecution.target=yarn-session -Dyarn.application.id=yarnSessionAppID(eg: 
> application_1677479737242_0052)
> sql-gateway need to use SecurityUtils Modules.
> default use flink-conf.yaml, also support -Dkey=value(eg: 1/2)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-kafka] chucheng92 commented on a diff in pull request #8: [FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not qui

2023-03-16 Thread via GitHub


chucheng92 commented on code in PR #8:
URL: 
https://github.com/apache/flink-connector-kafka/pull/8#discussion_r1139572887


##
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##
@@ -166,6 +174,68 @@ public void testReaderRegistrationTriggersAssignments() 
throws Throwable {
 }
 }
 
+@Test
+public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws 
Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+KafkaSourceEnumerator enumerator =
+createEnumerator(context, 
DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+// Start the enumerator and it should schedule a one time task to 
discover and assign
+// partitions.
+enumerator.start();
+
+// Run the partition discover callable and check the partition 
assignment.
+runOneTimePartitionDiscovery(context);
+
+// enumerator noMoreNewPartitionSplits first will be false, when 
execute
+// handlePartitionSplitChanges will be set true
+assertThat((Boolean) Whitebox.getInternalState(enumerator, 
"noMoreNewPartitionSplits"))
+.isTrue();
+}
+}
+
+@Test
+public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() 
throws Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+KafkaSourceEnumerator enumerator =
+createEnumerator(context, 
ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+// Start the enumerator and it should schedule a one time task to 
discover and assign
+// partitions.
+enumerator.start();
+assertThat(context.getOneTimeCallables()).isEmpty();
+assertThat(context.getPeriodicCallables())
+.as("A periodic partition discovery callable should have 
been scheduled")
+.hasSize(1);
+
+// enumerator noMoreNewPartitionSplits first will be false, even 
when execute
+// handlePartitionSplitChanges it still be false
+assertThat((Boolean) Whitebox.getInternalState(enumerator, 
"noMoreNewPartitionSplits"))
+.isFalse();
+}
+}
+
+@Test
+public void 
testRunWithDiscoverPartitionsOnceWithZeroMsToCheckNoMoreSplit() throws 
Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+// set partitionDiscoveryIntervalMs = 0
+KafkaSourceEnumerator enumerator = createEnumerator(context, 
0L)) {
+
+// Start the enumerator, and it should schedule a one time task to 
discover and assign
+// partitions.
+enumerator.start();
+runOneTimePartitionDiscovery(context);

Review Comment:
   Yes. good advice. I have fixed it. pls check it, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31486) Using KeySelector in IterationBody causes ClassNotFoundException

2023-03-16 Thread Jiang Xin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701508#comment-17701508
 ] 

Jiang Xin commented on FLINK-31486:
---

[~zhangzp] I'm afraid not. This issue is most likely causing by incorrect class 
loader, but FLINK-31255 seems not.

> Using KeySelector in IterationBody causes ClassNotFoundException
> 
>
> Key: FLINK-31486
> URL: https://issues.apache.org/jira/browse/FLINK-31486
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> When we use CoGroup along with KeySelector in an IterationBody, the following 
> exception occurs.
> {code:java}
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Could not instantiate state partitioner. at
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
>  at 
> org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
>  at 
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
>  at 
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
>  at 
> org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
> java.lang.Thread.run(Thread.java:748) 
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
>  of type org.apache.flink.api.java.functions.KeySelector in instance of 
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector 
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
>  at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) 
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) 
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>  at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
>  at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
>  ... 17 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-jdbc] lightzhao commented on pull request #26: [Improve]Increase the valid of judging connection.

2023-03-16 Thread via GitHub


lightzhao commented on PR #26:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/26#issuecomment-1473130167

   @kezhuw @leonardBang @wanglijie95 PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139726153


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java:
##
@@ -503,16 +522,16 @@ public void testViewSchema() throws Exception {
 
 CatalogView catalogView =
 (CatalogView) hiveCatalog.getTable(new ObjectPath("db1", 
"v1"));
-Schema viewSchema = catalogView.getUnresolvedSchema();
+ResolvedSchema viewSchema =

Review Comment:
   The main problem is the `UnresolvedDataType`
   
   The `UnresolvedDataType` do not provide the `equals` to compare. It's 
reasonable, because it's hard to say whether two `UnresolvedDataType` is equal. 
IMO, the resolved version can be comparable 樂.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


luoyuxia commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139717230


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java:
##
@@ -503,16 +522,16 @@ public void testViewSchema() throws Exception {
 
 CatalogView catalogView =
 (CatalogView) hiveCatalog.getTable(new ObjectPath("db1", 
"v1"));
-Schema viewSchema = catalogView.getUnresolvedSchema();
+ResolvedSchema viewSchema =

Review Comment:
   Not understand why `compare the Schema directly is not feasible/easy.
   Is there any problem if we don't change this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22199: [FLINK-31489][sql-gateway] Fix instable OperationManagerTest.testCloseOperation

2023-03-16 Thread via GitHub


flinkbot commented on PR #22199:
URL: https://github.com/apache/flink/pull/22199#issuecomment-1473119783

   
   ## CI report:
   
   * 698b344967b261ebf458a6a678680a78a9a206fe UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139716355


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java:
##
@@ -96,11 +106,104 @@ public class HiveTableUtil {
 
 private HiveTableUtil() {}
 
-public static TableSchema createTableSchema(
+/** Create a Flink's Schema by hive client. */
+public static org.apache.flink.table.api.Schema createSchema(
+HiveConf hiveConf,
+Table hiveTable,
+HiveMetastoreClientWrapper client,
+HiveShim hiveShim) {
+
+Tuple4, List, Set, 
Optional>
+hiveTableInfo = extractHiveTableInfo(hiveConf, hiveTable, 
client, hiveShim);
+
+return createSchema(
+hiveTableInfo.f0,
+hiveTableInfo.f1,
+hiveTableInfo.f2,
+hiveTableInfo.f3.orElse(null));
+}
+
+/** Create a Flink's Schema from Hive table's columns and partition keys. 
*/
+public static org.apache.flink.table.api.Schema createSchema(
+List nonPartCols,
+List partitionKeys,
+Set notNullColumns,
+@Nullable UniqueConstraint primaryKey) {
+return Schema.newBuilder()
+.fromResolvedSchema(
+createResolvedSchema(

Review Comment:
   Make sense, will apply your commits :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31489) OperationManagerTest.testCloseOperation failed because it couldn't find a submitted operation

2023-03-16 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701505#comment-17701505
 ] 

lincoln lee commented on FLINK-31489:
-

[~mapohl] seems it's an instable test(confirmed with [~shengkai] offline, he's 
on vocation), the test code can be improved, I've submited a fix and 
deprioritize this issue to Critical.

> OperationManagerTest.testCloseOperation failed because it couldn't find a 
> submitted operation
> -
>
> Key: FLINK-31489
> URL: https://issues.apache.org/jira/browse/FLINK-31489
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47218=logs=ce3801ad-3bd5-5f06-d165-34d37e757d90=5e4d9387-1dcc-5885-a901-90469b7e6d2f=13386
> {code}
> Mar 16 02:49:52 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 30.433 s <<< FAILURE! - in 
> org.apache.flink.table.gateway.service.operation.OperationManagerTest
> Mar 16 02:49:52 [ERROR] 
> org.apache.flink.table.gateway.service.operation.OperationManagerTest.testCloseOperation
>   Time elapsed: 0.042 s  <<< ERROR!
> Mar 16 02:49:52 org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
> Can not find the submitted operation in the OperationManager with the 
> 1734d6cf-cf52-40c5-804f-809e48a9818a.
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$getOperation$3(OperationManager.java:487)
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.readLock(OperationManager.java:518)
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.getOperation(OperationManager.java:482)
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.awaitOperationTermination(OperationManager.java:149)
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManagerTest.testCloseOperation(OperationManagerTest.java:199)
> Mar 16 02:49:52   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31489) OperationManagerTest.testCloseOperation failed because it couldn't find a submitted operation

2023-03-16 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-31489:

Priority: Critical  (was: Blocker)

> OperationManagerTest.testCloseOperation failed because it couldn't find a 
> submitted operation
> -
>
> Key: FLINK-31489
> URL: https://issues.apache.org/jira/browse/FLINK-31489
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47218=logs=ce3801ad-3bd5-5f06-d165-34d37e757d90=5e4d9387-1dcc-5885-a901-90469b7e6d2f=13386
> {code}
> Mar 16 02:49:52 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 30.433 s <<< FAILURE! - in 
> org.apache.flink.table.gateway.service.operation.OperationManagerTest
> Mar 16 02:49:52 [ERROR] 
> org.apache.flink.table.gateway.service.operation.OperationManagerTest.testCloseOperation
>   Time elapsed: 0.042 s  <<< ERROR!
> Mar 16 02:49:52 org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
> Can not find the submitted operation in the OperationManager with the 
> 1734d6cf-cf52-40c5-804f-809e48a9818a.
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$getOperation$3(OperationManager.java:487)
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.readLock(OperationManager.java:518)
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.getOperation(OperationManager.java:482)
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.awaitOperationTermination(OperationManager.java:149)
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManagerTest.testCloseOperation(OperationManagerTest.java:199)
> Mar 16 02:49:52   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31489) OperationManagerTest.testCloseOperation failed because it couldn't find a submitted operation

2023-03-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31489:
---
Labels: pull-request-available test-stability  (was: test-stability)

> OperationManagerTest.testCloseOperation failed because it couldn't find a 
> submitted operation
> -
>
> Key: FLINK-31489
> URL: https://issues.apache.org/jira/browse/FLINK-31489
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47218=logs=ce3801ad-3bd5-5f06-d165-34d37e757d90=5e4d9387-1dcc-5885-a901-90469b7e6d2f=13386
> {code}
> Mar 16 02:49:52 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 30.433 s <<< FAILURE! - in 
> org.apache.flink.table.gateway.service.operation.OperationManagerTest
> Mar 16 02:49:52 [ERROR] 
> org.apache.flink.table.gateway.service.operation.OperationManagerTest.testCloseOperation
>   Time elapsed: 0.042 s  <<< ERROR!
> Mar 16 02:49:52 org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
> Can not find the submitted operation in the OperationManager with the 
> 1734d6cf-cf52-40c5-804f-809e48a9818a.
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$getOperation$3(OperationManager.java:487)
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.readLock(OperationManager.java:518)
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.getOperation(OperationManager.java:482)
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.awaitOperationTermination(OperationManager.java:149)
> Mar 16 02:49:52   at 
> org.apache.flink.table.gateway.service.operation.OperationManagerTest.testCloseOperation(OperationManagerTest.java:199)
> Mar 16 02:49:52   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


luoyuxia commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139712015


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java:
##
@@ -96,11 +106,104 @@ public class HiveTableUtil {
 
 private HiveTableUtil() {}
 
-public static TableSchema createTableSchema(
+/** Create a Flink's Schema by hive client. */
+public static org.apache.flink.table.api.Schema createSchema(
+HiveConf hiveConf,
+Table hiveTable,
+HiveMetastoreClientWrapper client,
+HiveShim hiveShim) {
+
+Tuple4, List, Set, 
Optional>
+hiveTableInfo = extractHiveTableInfo(hiveConf, hiveTable, 
client, hiveShim);
+
+return createSchema(
+hiveTableInfo.f0,
+hiveTableInfo.f1,
+hiveTableInfo.f2,
+hiveTableInfo.f3.orElse(null));
+}
+
+/** Create a Flink's Schema from Hive table's columns and partition keys. 
*/
+public static org.apache.flink.table.api.Schema createSchema(
+List nonPartCols,
+List partitionKeys,
+Set notNullColumns,
+@Nullable UniqueConstraint primaryKey) {
+return Schema.newBuilder()
+.fromResolvedSchema(
+createResolvedSchema(

Review Comment:
   I aggree it's not harmful to do extra schema conversion althogth it'll 
create extra object and `createSchema` will be  called a liitle of frequently.
   But if we can meet both code save and reduce schema conversion with minor 
efforts, why not do that?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lincoln-lil opened a new pull request, #22199: [FLINK-31489][sql-gateway] Fix instable OperationManagerTest.testCloseOperation

2023-03-16 Thread via GitHub


lincoln-lil opened a new pull request, #22199:
URL: https://github.com/apache/flink/pull/22199

   ## What is the purpose of the change
   This is hotfix for the instable test 
`OperationManagerTest.testCloseOperation`, the reason is the concurrent 
closeOperation may finished before  awaitOperationTermination(operationHandle) 
which may encounter an expected error when do getOperation inside it.
   
   ## Brief change log
   move the awaitOperationTermination into the assert thrown block
   
   ## Verifying this change
   existing testCloseOperation case
   
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
@Public(Evolving): (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
 - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139710984


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java:
##
@@ -503,16 +522,16 @@ public void testViewSchema() throws Exception {
 
 CatalogView catalogView =
 (CatalogView) hiveCatalog.getTable(new ObjectPath("db1", 
"v1"));
-Schema viewSchema = catalogView.getUnresolvedSchema();
+ResolvedSchema viewSchema =

Review Comment:
   Yes, you are right. But as my comments above, compare the Schema directly is 
not feasible/easy. So we compare the resolved version to compare with the 
expected one. I think it can cover the original requirements. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139706415


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.UnresolvedDataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/** Testing implementation of {@link SchemaResolver}. */
+public class TestSchemaResolver implements SchemaResolver {
+
+private final DataTypeFactory dataTypeFactory = new 
TestingDataTypeFactory();
+private final Map resolveExpressionTable = new 
HashMap<>();
+
+@Override
+public ResolvedSchema resolve(Schema schema) {
+final List columns = resolveColumns(schema.getColumns());
+
+final List watermarkSpecs =
+schema.getWatermarkSpecs().stream()
+.map(this::resolveWatermarkSpecs)
+.collect(Collectors.toList());
+
+final UniqueConstraint primaryKey = 
resolvePrimaryKey(schema.getPrimaryKey().orElse(null));
+
+return new ResolvedSchema(columns, watermarkSpecs, primaryKey);
+}
+
+private List resolveColumns(List 
unresolvedColumns) {
+final Column[] resolvedColumns = new Column[unresolvedColumns.size()];
+
+int i = 0;
+for (Schema.UnresolvedColumn unresolvedColumn : unresolvedColumns) {
+if (unresolvedColumn instanceof Schema.UnresolvedPhysicalColumn) {
+resolvedColumns[i] =
+
resolvePhysicalColumn((Schema.UnresolvedPhysicalColumn) unresolvedColumn);
+} else if (unresolvedColumn instanceof 
Schema.UnresolvedMetadataColumn) {
+resolvedColumns[i] =
+
resolveMetadataColumn((Schema.UnresolvedMetadataColumn) unresolvedColumn);
+} else if (unresolvedColumn instanceof 
Schema.UnresolvedComputedColumn) {
+resolvedColumns[i] =
+Column.computed(
+unresolvedColumn.getName(),
+resolveExpression(
+
((Schema.UnresolvedComputedColumn) unresolvedColumn)
+.getExpression()))
+
.withComment(unresolvedColumn.getComment().orElse(null));
+}
+i++;
+}
+return Arrays.asList(resolvedColumns);
+}
+
+private Column.PhysicalColumn resolvePhysicalColumn(
+Schema.UnresolvedPhysicalColumn unresolvedColumn) {
+return Column.physical(
+unresolvedColumn.getName(),
+
dataTypeFactory.createDataType(unresolvedColumn.getDataType()))
+.withComment(unresolvedColumn.getComment().orElse(null));
+}
+
+private Column.MetadataColumn resolveMetadataColumn(
+Schema.UnresolvedMetadataColumn unresolvedColumn) {
+return Column.metadata(
+unresolvedColumn.getName(),
+
dataTypeFactory.createDataType(unresolvedColumn.getDataType()),
+unresolvedColumn.getMetadataKey(),
+unresolvedColumn.isVirtual())
+

[jira] [Commented] (FLINK-31081) Update Jira links in How To Contribute guide

2023-03-16 Thread Eric Brzezenski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701502#comment-17701502
 ] 

Eric Brzezenski commented on FLINK-31081:
-

Do you mind assigning this to me? ill update these locations and any others 
that I may find. 

> Update Jira links in How To Contribute guide
> 
>
> Key: FLINK-31081
> URL: https://issues.apache.org/jira/browse/FLINK-31081
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: starter
>
> FLINK-30007 added a description in [the community 
> docs|https://flink.apache.org/community.html#issue-tracker] on how to get 
> access to the Flink Jira. But several other locations mentioned and link Jira 
> as well (especially in the how to contribute sections). Newcomers might be 
> miss the paragraph in community and wonder how they could get a working Jira 
> account.
> This issue is about replacing all the Jira links (where it's useful) by a 
> reference to the issue track section in the community docs 
> (https://flink.apache.org/community.html#issue-tracker). That way, they are 
> redirected to the information on how they can gain access to the Flink jira 
> board.
> Locations that needs to be updated are (not exclusively!):
> * https://flink.apache.org/contributing/contribute-code.html
> * https://flink.apache.org/gettinghelp.html
> *  https://flink.apache.org/contributing/how-to-contribute.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139701322


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java:
##
@@ -96,11 +106,104 @@ public class HiveTableUtil {
 
 private HiveTableUtil() {}
 
-public static TableSchema createTableSchema(
+/** Create a Flink's Schema by hive client. */
+public static org.apache.flink.table.api.Schema createSchema(
+HiveConf hiveConf,
+Table hiveTable,
+HiveMetastoreClientWrapper client,
+HiveShim hiveShim) {
+
+Tuple4, List, Set, 
Optional>
+hiveTableInfo = extractHiveTableInfo(hiveConf, hiveTable, 
client, hiveShim);
+
+return createSchema(
+hiveTableInfo.f0,
+hiveTableInfo.f1,
+hiveTableInfo.f2,
+hiveTableInfo.f3.orElse(null));
+}
+
+/** Create a Flink's Schema from Hive table's columns and partition keys. 
*/
+public static org.apache.flink.table.api.Schema createSchema(
+List nonPartCols,
+List partitionKeys,
+Set notNullColumns,
+@Nullable UniqueConstraint primaryKey) {
+return Schema.newBuilder()
+.fromResolvedSchema(
+createResolvedSchema(

Review Comment:
   > In here, we will fisrt convert to ResolvedSchema and then convert to 
Schema. So why not convert to Schema directly?
   
   It's intend to save the code, because creating `ResolvedSchema` and `Schema` 
actually doing the same thing. IMO, we do not have to avoid using 
`Schema.newBuilder().fromResolvedSchema().build()` as much as possible, I think 
its harmless. Since it is a bridge to covert the ResolvedSchema back to Schema 
when needed.
   
   In this case, we can create `ResolvedSchema` directly from the Hive table 
information. In my original thought, we even do not have to add the dedicated 
method as below, the caller just can do it depend on the requirements.
   
   ```
   Schema createScheam(List nonPartCols,
   List partitionKeys,
   Set notNullColumns,
   @Nullable UniqueConstraint primaryKey) {
 return Schema.newBuilder()
   .fromResolvedSchema(
   createResolvedSchema(
   nonPartCols, partitionKeys, notNullColumns, 
primaryKey))
   .build();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30719) flink-runtime-web failed due to a corrupted

2023-03-16 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701501#comment-17701501
 ] 

Leonard Xu commented on FLINK-30719:


[~junhan] Could you take a look this ticket ? 

> flink-runtime-web failed due to a corrupted 
> 
>
> Key: FLINK-30719
> URL: https://issues.apache.org/jira/browse/FLINK-30719
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend, Test Infrastructure, Tests
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44954=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=12550
> The build failed due to a corrupted nodejs dependency:
> {code}
> [ERROR] The archive file 
> /__w/1/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz
>  is corrupted and will be deleted. Please try the build again.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


luoyuxia commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139691598


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.UnresolvedDataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/** Testing implementation of {@link SchemaResolver}. */
+public class TestSchemaResolver implements SchemaResolver {
+
+private final DataTypeFactory dataTypeFactory = new 
TestingDataTypeFactory();
+private final Map resolveExpressionTable = new 
HashMap<>();
+
+@Override
+public ResolvedSchema resolve(Schema schema) {
+final List columns = resolveColumns(schema.getColumns());
+
+final List watermarkSpecs =
+schema.getWatermarkSpecs().stream()
+.map(this::resolveWatermarkSpecs)
+.collect(Collectors.toList());
+
+final UniqueConstraint primaryKey = 
resolvePrimaryKey(schema.getPrimaryKey().orElse(null));
+
+return new ResolvedSchema(columns, watermarkSpecs, primaryKey);
+}
+
+private List resolveColumns(List 
unresolvedColumns) {
+final Column[] resolvedColumns = new Column[unresolvedColumns.size()];
+
+int i = 0;
+for (Schema.UnresolvedColumn unresolvedColumn : unresolvedColumns) {
+if (unresolvedColumn instanceof Schema.UnresolvedPhysicalColumn) {
+resolvedColumns[i] =
+
resolvePhysicalColumn((Schema.UnresolvedPhysicalColumn) unresolvedColumn);
+} else if (unresolvedColumn instanceof 
Schema.UnresolvedMetadataColumn) {
+resolvedColumns[i] =
+
resolveMetadataColumn((Schema.UnresolvedMetadataColumn) unresolvedColumn);
+} else if (unresolvedColumn instanceof 
Schema.UnresolvedComputedColumn) {
+resolvedColumns[i] =
+Column.computed(
+unresolvedColumn.getName(),
+resolveExpression(
+
((Schema.UnresolvedComputedColumn) unresolvedColumn)
+.getExpression()))
+
.withComment(unresolvedColumn.getComment().orElse(null));
+}
+i++;
+}
+return Arrays.asList(resolvedColumns);
+}
+
+private Column.PhysicalColumn resolvePhysicalColumn(
+Schema.UnresolvedPhysicalColumn unresolvedColumn) {
+return Column.physical(
+unresolvedColumn.getName(),
+
dataTypeFactory.createDataType(unresolvedColumn.getDataType()))
+.withComment(unresolvedColumn.getComment().orElse(null));
+}
+
+private Column.MetadataColumn resolveMetadataColumn(
+Schema.UnresolvedMetadataColumn unresolvedColumn) {
+return Column.metadata(
+unresolvedColumn.getName(),
+
dataTypeFactory.createDataType(unresolvedColumn.getDataType()),
+unresolvedColumn.getMetadataKey(),
+unresolvedColumn.isVirtual())
+

[jira] [Assigned] (FLINK-31443) FineGrainedSlotManager maintain some redundant task managers

2023-03-16 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-31443:
--

Assignee: Weihua Hu

> FineGrainedSlotManager maintain some redundant task managers
> 
>
> Key: FLINK-31443
> URL: https://issues.apache.org/jira/browse/FLINK-31443
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Weihua Hu
>Assignee: Weihua Hu
>Priority: Major
>
> implementation of 
> [FLINK-18625|https://issues.apache.org/jira/browse/FLINK-18625] in 
> FineGrainedSlotManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31447) Aligning unit tests of FineGrainedSlotManager with DeclarativeSlotManager

2023-03-16 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-31447:
--

Assignee: Weihua Hu

> Aligning unit tests of FineGrainedSlotManager with DeclarativeSlotManager
> -
>
> Key: FLINK-31447
> URL: https://issues.apache.org/jira/browse/FLINK-31447
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Weihua Hu
>Assignee: Weihua Hu
>Priority: Major
>
> There's the DeclarativeSlotManagerTest that covers some specific issues that 
> should be ported to the fine grained slot manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


luoyuxia commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139688985


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java:
##
@@ -503,16 +522,16 @@ public void testViewSchema() throws Exception {
 
 CatalogView catalogView =
 (CatalogView) hiveCatalog.getTable(new ObjectPath("db1", 
"v1"));
-Schema viewSchema = catalogView.getUnresolvedSchema();
+ResolvedSchema viewSchema =

Review Comment:
   IIUC, this test method is meant to check the `CatalogView` gotten from 
method `hiveCatalog.getTable` . As the gotten schema is UnResolvedSchema, which 
compare with ResolvedSchema? it'll extended the test scope.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139685134


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java:
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.UnresolvedDataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/** Testing implementation of {@link SchemaResolver}. */
+public class TestSchemaResolver implements SchemaResolver {
+
+private final DataTypeFactory dataTypeFactory = new 
TestingDataTypeFactory();
+private final Map resolveExpressionTable = new 
HashMap<>();
+
+@Override
+public ResolvedSchema resolve(Schema schema) {
+final List columns = resolveColumns(schema.getColumns());
+
+final List watermarkSpecs =
+schema.getWatermarkSpecs().stream()
+.map(this::resolveWatermarkSpecs)
+.collect(Collectors.toList());
+
+final UniqueConstraint primaryKey = 
resolvePrimaryKey(schema.getPrimaryKey().orElse(null));
+
+return new ResolvedSchema(columns, watermarkSpecs, primaryKey);
+}
+
+private List resolveColumns(List 
unresolvedColumns) {
+final Column[] resolvedColumns = new Column[unresolvedColumns.size()];
+
+int i = 0;
+for (Schema.UnresolvedColumn unresolvedColumn : unresolvedColumns) {
+if (unresolvedColumn instanceof Schema.UnresolvedPhysicalColumn) {
+resolvedColumns[i] =
+
resolvePhysicalColumn((Schema.UnresolvedPhysicalColumn) unresolvedColumn);
+} else if (unresolvedColumn instanceof 
Schema.UnresolvedMetadataColumn) {
+resolvedColumns[i] =
+
resolveMetadataColumn((Schema.UnresolvedMetadataColumn) unresolvedColumn);
+} else if (unresolvedColumn instanceof 
Schema.UnresolvedComputedColumn) {
+resolvedColumns[i] =
+Column.computed(
+unresolvedColumn.getName(),
+resolveExpression(
+
((Schema.UnresolvedComputedColumn) unresolvedColumn)
+.getExpression()))
+
.withComment(unresolvedColumn.getComment().orElse(null));
+}
+i++;
+}
+return Arrays.asList(resolvedColumns);
+}
+
+private Column.PhysicalColumn resolvePhysicalColumn(
+Schema.UnresolvedPhysicalColumn unresolvedColumn) {
+return Column.physical(
+unresolvedColumn.getName(),
+
dataTypeFactory.createDataType(unresolvedColumn.getDataType()))
+.withComment(unresolvedColumn.getComment().orElse(null));
+}
+
+private Column.MetadataColumn resolveMetadataColumn(
+Schema.UnresolvedMetadataColumn unresolvedColumn) {
+return Column.metadata(
+unresolvedColumn.getName(),
+
dataTypeFactory.createDataType(unresolvedColumn.getDataType()),
+unresolvedColumn.getMetadataKey(),
+unresolvedColumn.isVirtual())
+

[GitHub] [flink] chucheng92 commented on pull request #22179: [FLINK-31380][table] FLIP-297: Support enhanced show catalogs syntax

2023-03-16 Thread via GitHub


chucheng92 commented on PR #22179:
URL: https://github.com/apache/flink/pull/22179#issuecomment-1473088287

   > @wuchong hi, jark. I added a lot of test cases and did integration testing 
for a while. Can u help me to review this feature?
   
   cc @wuchong PTAL~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139678740


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java:
##
@@ -91,16 +91,50 @@ public static Map 
serializeCatalogTable(ResolvedCatalogTable res
 }
 }
 
+/** Serializes the given {@link ResolvedCatalogView} into a map of string 
properties. */
+public static Map serializeCatalogView(ResolvedCatalogView 
resolvedView) {
+try {
+final Map properties = new HashMap<>();
+
+serializeResolvedSchema(properties, 
resolvedView.getResolvedSchema());
+
+final String comment = resolvedView.getComment();
+if (comment != null && comment.length() > 0) {
+properties.put(COMMENT, comment);
+}
+
+properties.putAll(resolvedView.getOptions());
+
+properties.remove(IS_GENERIC); // reserved option
+
+return properties;
+} catch (Exception e) {
+throw new CatalogException("Error in serializing catalog view.", 
e);
+}
+}
+
 /** Deserializes the given map of string properties into an unresolved 
{@link CatalogTable}. */
 public static CatalogTable deserializeCatalogTable(Map 
properties) {
+return deserializeCatalogTable(properties, null);
+}
+
+/** Deserializes the given map of string properties into an unresolved 
{@link CatalogTable}. */
+public static CatalogTable deserializeCatalogTable(

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31259) Gateway supports initialization of catalog at startup

2023-03-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701494#comment-17701494
 ] 

Jark Wu commented on FLINK-31259:
-

[~lintingbin] I see. What do you think about this? [~fsk119]

> Gateway supports initialization of catalog at startup
> -
>
> Key: FLINK-31259
> URL: https://issues.apache.org/jira/browse/FLINK-31259
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Support to initializing catalogs in gateway when it starts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31380) Support enhanced show catalogs syntax

2023-03-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31380:
---
Labels: pull-request-available  (was: )

> Support enhanced show catalogs syntax
> -
>
> Key: FLINK-31380
> URL: https://issues.apache.org/jira/browse/FLINK-31380
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: Ran Tao
>Assignee: Ran Tao
>Priority: Major
>  Labels: pull-request-available
>
> As FLIP discussed. We will support new syntax for some show operations.
> To avoid bloat, this ticket supports ShowCatalogs.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139673570


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java:
##
@@ -503,16 +522,16 @@ public void testViewSchema() throws Exception {
 
 CatalogView catalogView =
 (CatalogView) hiveCatalog.getTable(new ObjectPath("db1", 
"v1"));
-Schema viewSchema = catalogView.getUnresolvedSchema();
+ResolvedSchema viewSchema =

Review Comment:
   Why? We need compare with ResolvedSchema 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22198: [FLINK-30923][documentation] Provide single script for installing Hugo

2023-03-16 Thread via GitHub


flinkbot commented on PR #22198:
URL: https://github.com/apache/flink/pull/22198#issuecomment-1473083751

   
   ## CI report:
   
   * 68d441a953af8352554af6e55455aab097fcf386 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-30011) HiveCatalogGenericMetadataTest azure CI failed due to catalog does not exist

2023-03-16 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia resolved FLINK-30011.
--
Resolution: Duplicate

> HiveCatalogGenericMetadataTest azure CI failed due to catalog does not exist
> 
>
> Key: FLINK-30011
> URL: https://issues.apache.org/jira/browse/FLINK-30011
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.16.1
>Reporter: Leonard Xu
>Priority: Major
>
> {noformat}
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testGetPartitionStats:1212 » Catalog 
> F...
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testGetPartition_PartitionNotExist:1160
>  » Catalog
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testGetPartition_PartitionSpecInvalid_invalidPartitionSpec:1124
>  » Catalog
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testGetPartition_PartitionSpecInvalid_sizeNotEqual:1139
>  » Catalog
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testGetPartition_TableNotPartitioned:1110
>  » Catalog
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testGetTableStats_TableNotExistException:1201
>  » Catalog
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testGetTable_TableNotExistException:323
>  » Catalog
> Nov 13 01:55:18 [ERROR]   HiveCatalogHiveMetadataTest.testHiveStatistics:251 
> » Catalog Failed to create ...
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testListFunctions:749 » Catalog 
> Failed...
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testListPartitionPartialSpec:1188 » 
> Catalog
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testListTables:498 » Catalog Failed 
> to...
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testListView:620 » Catalog Failed to 
> c...
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testPartitionExists:1174 » Catalog 
> Fai...
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testRenameTable_TableAlreadyExistException:483
>  » Catalog
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testRenameTable_TableNotExistException:465
>  » Catalog
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testRenameTable_TableNotExistException_ignored:477
>  » Catalog
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testRenameTable_nonPartitionedTable:451
>  » Catalog
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testRenameView:637 » Catalog Failed 
> to...
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest>CatalogTest.testTableExists:510 » Catalog Failed 
> t...
> Nov 13 01:55:18 [ERROR]   
> HiveCatalogHiveMetadataTest.testViewCompatibility:115 » Catalog Failed to 
> crea...
> Nov 13 01:55:18 [INFO] 
> Nov 13 01:55:18 [ERROR] Tests run: 361, Failures: 0, Errors: 132, Skipped: 0
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43104=logs=245e1f2e-ba5b-5570-d689-25ae21e5302f=d04c9862-880c-52f5-574b-a7a79fef8e0f



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30923) Provide single script for installing Hugo

2023-03-16 Thread Eric Brzezenski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701489#comment-17701489
 ] 

Eric Brzezenski commented on FLINK-30923:
-

Would you mind assigning this ticket to me please? i think i understood the 
description and pushed out a change: https://github.com/apache/flink/pull/22198

> Provide single script for installing Hugo
> -
>
> Key: FLINK-30923
> URL: https://issues.apache.org/jira/browse/FLINK-30923
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, starter
>
> Currently, we have multiple locations to install hugo. In the past this 
> caused using different Hugo version for building the docs depending which 
> script triggered the docs build (see 
> [ci/docs.sh|https://github.com/apache/flink/blob/de368acf0038328a751507a2fa7cb0989d6312e7/tools/ci/docs.sh#L20]
>  and 
> [.github/workflows/docs.sh:26|https://github.com/apache/flink/blob/278642219ebf4b68a02a3901fe06a7cb006d105b/.github/workflows/docs.sh#L26]).
>  We could have a {{docs/setup_hugo.sh}} next to {{docs/setup_docs.sh}} that 
> provides this functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139668108


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java:
##
@@ -682,11 +682,11 @@ private RelNode handleDestSchema(
 return queryRelNode;
 }
 
+ResolvedCatalogTable resolvedCatalogTable = 
catalogManager.resolveCatalogTable(destTable);

Review Comment:
   forgot it, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30923) Provide single script for installing Hugo

2023-03-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30923:
---
Labels: pull-request-available starter  (was: starter)

> Provide single script for installing Hugo
> -
>
> Key: FLINK-30923
> URL: https://issues.apache.org/jira/browse/FLINK-30923
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, starter
>
> Currently, we have multiple locations to install hugo. In the past this 
> caused using different Hugo version for building the docs depending which 
> script triggered the docs build (see 
> [ci/docs.sh|https://github.com/apache/flink/blob/de368acf0038328a751507a2fa7cb0989d6312e7/tools/ci/docs.sh#L20]
>  and 
> [.github/workflows/docs.sh:26|https://github.com/apache/flink/blob/278642219ebf4b68a02a3901fe06a7cb006d105b/.github/workflows/docs.sh#L26]).
>  We could have a {{docs/setup_hugo.sh}} next to {{docs/setup_docs.sh}} that 
> provides this functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] EricBrzezenski opened a new pull request, #22198: [FLINK-30923][documentation] Provide single script for installing Hugo

2023-03-16 Thread via GitHub


EricBrzezenski opened a new pull request, #22198:
URL: https://github.com/apache/flink/pull/22198

   ## What is the purpose of the change
   
   To Add a single source for downloading hugo artifacts for building the 
documentation.
   
   ## Brief change log
   
   - Consolidating Hugo artifact downloads to a single location
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): No
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: No
 - The serializers: No
 - The runtime per-record code paths (performance sensitive): No
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No
 - The S3 file system connector: No
   
   ## Documentation
   
 - Does this pull request introduce a new feature? No
 - If yes, how is the feature documented? (N/A
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31432) Introduce a special StoreWriteOperator to deal with schema changes

2023-03-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31432:
---
Labels: pull-request-available  (was: )

> Introduce a special StoreWriteOperator to deal with schema changes
> --
>
> Key: FLINK-31432
> URL: https://issues.apache.org/jira/browse/FLINK-31432
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> Currently \{{StoreWriteOperator}} is not able to deal with schema changes. We 
> need to introduce a special \{{StoreWriteOperator}} to deal with schema 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


luoyuxia commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139647818


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java:
##
@@ -91,16 +91,50 @@ public static Map 
serializeCatalogTable(ResolvedCatalogTable res
 }
 }
 
+/** Serializes the given {@link ResolvedCatalogView} into a map of string 
properties. */
+public static Map serializeCatalogView(ResolvedCatalogView 
resolvedView) {
+try {
+final Map properties = new HashMap<>();
+
+serializeResolvedSchema(properties, 
resolvedView.getResolvedSchema());
+
+final String comment = resolvedView.getComment();
+if (comment != null && comment.length() > 0) {
+properties.put(COMMENT, comment);
+}
+
+properties.putAll(resolvedView.getOptions());
+
+properties.remove(IS_GENERIC); // reserved option
+
+return properties;
+} catch (Exception e) {
+throw new CatalogException("Error in serializing catalog view.", 
e);
+}
+}
+
 /** Deserializes the given map of string properties into an unresolved 
{@link CatalogTable}. */
 public static CatalogTable deserializeCatalogTable(Map 
properties) {
+return deserializeCatalogTable(properties, null);
+}
+
+/** Deserializes the given map of string properties into an unresolved 
{@link CatalogTable}. */
+public static CatalogTable deserializeCatalogTable(

Review Comment:
   Please add comment for the parameter `fallbackKey`. I do spend some time to 
understand it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-16 Thread via GitHub


luoyuxia commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1139614246


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java:
##
@@ -96,11 +106,104 @@ public class HiveTableUtil {
 
 private HiveTableUtil() {}
 
-public static TableSchema createTableSchema(
+/** Create a Flink's Schema by hive client. */
+public static org.apache.flink.table.api.Schema createSchema(
+HiveConf hiveConf,
+Table hiveTable,
+HiveMetastoreClientWrapper client,
+HiveShim hiveShim) {
+
+Tuple4, List, Set, 
Optional>
+hiveTableInfo = extractHiveTableInfo(hiveConf, hiveTable, 
client, hiveShim);
+
+return createSchema(
+hiveTableInfo.f0,
+hiveTableInfo.f1,
+hiveTableInfo.f2,
+hiveTableInfo.f3.orElse(null));
+}
+
+/** Create a Flink's Schema from Hive table's columns and partition keys. 
*/
+public static org.apache.flink.table.api.Schema createSchema(
+List nonPartCols,
+List partitionKeys,
+Set notNullColumns,
+@Nullable UniqueConstraint primaryKey) {
+return Schema.newBuilder()
+.fromResolvedSchema(
+createResolvedSchema(
+nonPartCols, partitionKeys, notNullColumns, 
primaryKey))
+.build();
+}
+
+/** Create a Flink's ResolvedSchema from Hive table's columns and 
partition keys. */
+public static ResolvedSchema createResolvedSchema(
+List nonPartCols,
+List partitionKeys,
+Set notNullColumns,
+@Nullable UniqueConstraint primaryKey) {
+List allCols = new ArrayList<>(nonPartCols);
+allCols.addAll(partitionKeys);
+
+// PK columns cannot be null
+if (primaryKey != null) {
+notNullColumns.addAll(primaryKey.getColumns());
+}
+
+Tuple2 columnInformation =
+extractColumnInformation(allCols, notNullColumns);
+
+org.apache.flink.table.catalog.UniqueConstraint pk = null;
+if (primaryKey != null) {
+pk =
+org.apache.flink.table.catalog.UniqueConstraint.primaryKey(
+primaryKey.getName(), primaryKey.getColumns());
+}
+return new ResolvedSchema(
+IntStream.range(0, columnInformation.f0.length)
+.mapToObj(
+i ->
+Column.physical(
+columnInformation.f0[i], 
columnInformation.f1[i]))
+.collect(Collectors.toList()),
+new ArrayList<>(),

Review Comment:
   nit:
   ` Collections.emptyList();`



##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java:
##
@@ -259,27 +261,55 @@ void testCreateTableWithConstraints() throws Exception {
 assumeThat(HiveVersionTestUtil.HIVE_310_OR_LATER).isTrue();
 HiveCatalog hiveCatalog = (HiveCatalog) catalog;
 hiveCatalog.createDatabase(db1, createDb(), false);
-TableSchema.Builder builder = TableSchema.builder();
-builder.fields(
-new String[] {"x", "y", "z"},
-new DataType[] {
-DataTypes.INT().notNull(), 
DataTypes.TIMESTAMP(9).notNull(), DataTypes.BIGINT()
-});
-builder.primaryKey("pk_name", new String[] {"x"});
+ResolvedSchema resolvedSchema =
+new ResolvedSchema(
+Arrays.asList(
+Column.physical("x", 
DataTypes.INT().notNull()),
+Column.physical("y", 
DataTypes.TIMESTAMP(9).notNull()),
+Column.physical("z", 
DataTypes.BIGINT().notNull())),
+new ArrayList<>(),
+
org.apache.flink.table.catalog.UniqueConstraint.primaryKey(
+"pk_name", Collections.singletonList("x")));
+
 hiveCatalog.createTable(
 path1,
-new CatalogTableImpl(builder.build(), 
getBatchTableProperties(), null),
+new ResolvedCatalogTable(
+CatalogTable.of(
+
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
+null,
+new ArrayList<>(),
+getBatchTableProperties()),
+resolvedSchema),
 false);
 CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(path1);
-

[jira] [Assigned] (FLINK-31445) Split resource allocate/release related logic from FineGrainedSlotManager to TaskManagerTracker

2023-03-16 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-31445:
--

Assignee: Weihua Hu

> Split resource allocate/release related logic from FineGrainedSlotManager to 
> TaskManagerTracker
> ---
>
> Key: FLINK-31445
> URL: https://issues.apache.org/jira/browse/FLINK-31445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Weihua Hu
>Assignee: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
>
> Currently the FineGrainedSlotManager is response to slots allocations and 
> resources request/release. This makes the logical of FineGrainedSlotManager 
> complicated, So we will move task manager related work from 
> FineGrainedSlotManager to TaskManagerTracker, which already tracks task 
> managers but not including request/release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31444) FineGrainedSlotManager reclaims slots when job is finished

2023-03-16 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-31444:
--

Assignee: Weihua Hu

> FineGrainedSlotManager reclaims slots when job is finished
> --
>
> Key: FLINK-31444
> URL: https://issues.apache.org/jira/browse/FLINK-31444
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Weihua Hu
>Assignee: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
>
> implementation of 
> [FLINK-21751|https://issues.apache.org/jira/browse/FLINK-21751] in 
> FineGrainedSlotManager



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31441) FineGrainedSlotManager support select slot evenly

2023-03-16 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-31441:
--

Assignee: Weihua Hu

> FineGrainedSlotManager support select slot evenly
> -
>
> Key: FLINK-31441
> URL: https://issues.apache.org/jira/browse/FLINK-31441
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Weihua Hu
>Assignee: Weihua Hu
>Priority: Major
>  Labels: pull-request-available
>
> with [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122], we 
> support spread out tasks evenly to available task managers. 
> FineGrainedSlotManager should support this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31439) FLIP-298: Unifying the Implementation of SlotManager

2023-03-16 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-31439:
--

Assignee: Weihua Hu

> FLIP-298: Unifying the Implementation of SlotManager
> 
>
> Key: FLINK-31439
> URL: https://issues.apache.org/jira/browse/FLINK-31439
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Weihua Hu
>Assignee: Weihua Hu
>Priority: Major
> Fix For: 1.18.0
>
>
> This is an umbrella ticket for 
> [FLIP-298|https://cwiki.apache.org/confluence/display/FLINK/FLIP-298%3A+Unifying+the+Implementation+of+SlotManager].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31430) Support migrating states between different instances of TableWriteImpl and AbstractFileStoreWrite

2023-03-16 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-31430.

Resolution: Fixed

master: 1af16a5a1cb27b6b72bbcef9e5862fda84d8c996

> Support migrating states between different instances of TableWriteImpl and 
> AbstractFileStoreWrite
> -
>
> Key: FLINK-31430
> URL: https://issues.apache.org/jira/browse/FLINK-31430
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> Currently {{Table}} and {{TableWrite}} in Flink Table Store have a fixed 
> schema. However to consume schema changes, Flink Table Store CDC sinks should 
> have the ability to change its schema during a streaming job.
> This require us to pause and store the states of a {{TableWrite}}, then 
> create a {{TableWrite}} with newer schema and recover the states in the new 
> {{TableWrite}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] chucheng92 commented on pull request #22192: [BP-1.16][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit

2023-03-16 Thread via GitHub


chucheng92 commented on PR #22192:
URL: https://github.com/apache/flink/pull/22192#issuecomment-1473068221

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31259) Gateway supports initialization of catalog at startup

2023-03-16 Thread Darcy Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701476#comment-17701476
 ] 

Darcy Lin commented on FLINK-31259:
---

[~jark] 
[FLIP-295|https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Introduce+Pluggable+Catalog+Management]
 may indeed be able to solve this problem, but I feel that it is more biased 
towards catalog storage problems. If sql-gateway can support a " -i, --init 
" parameter similar to sql-client or support a similar 
"sql-gateway-defaults.yaml" configuration file similar to 
[ververica-flink-sql-gateway|https://github.com/ververica/flink-sql-gateway/blob/master/conf]
 can solve this problem very well. Because it is not just catalogs, it may also 
involve some udf initialization and so on.

> Gateway supports initialization of catalog at startup
> -
>
> Key: FLINK-31259
> URL: https://issues.apache.org/jira/browse/FLINK-31259
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Support to initializing catalogs in gateway when it starts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] Myasuka commented on a diff in pull request #618: Announcement blogpost for the 1.17 release

2023-03-16 Thread via GitHub


Myasuka commented on code in PR #618:
URL: https://github.com/apache/flink-web/pull/618#discussion_r1135085057


##
docs/content/posts/2023-03-09-release-1.17.0.md:
##
@@ -0,0 +1,487 @@
+---
+authors:
+- LeonardXu:
+  name: "Leonard Xu"
+  twitter: Leonardxbj
+date: "2023-03-09T08:00:00Z" #FIXME: Change to the actual release date, also 
the date in the filename, and the directory name of linked images
+subtitle: ""
+title: Announcing the Release of Apache Flink 1.17
+aliases:
+- /news/2023/03/09/release-1.17.0.html #FIXME: Change to the actual release 
date
+---
+
+The Apache Flink PMC is pleased to announce Apache Flink release 1.17.0. Apache
+Flink is the leading stream processing standard, and the concept of unified
+stream and batch data processing is being successfully adopted in more and more
+companies. Thanks to our excellent community and contributors, Apache Flink
+continues to grow as a technology and remains one of the most active projects 
in
+the Apache Software Foundation. Flink 1.17 had 173 contributors 
enthusiastically
+participating and saw the completion of 7 FLIPs and 600+ issues, bringing many
+exciting new features and improvements to the community.
+
+
+# Towards Streaming Warehouses
+
+In order to achieve greater efficiency in the realm of [streaming
+warehouse](https://www.alibabacloud.com/blog/more-than-computing-a-new-era-led-by-the-warehouse-architecture-of-apache-flink_598821),
+Flink 1.17 contains substantial improvements to both the performance of batch
+processing and the semantics of streaming processing. These improvements
+represent a significant stride towards the creation of a more efficient and
+streamlined data warehouse, capable of processing large quantities of data in
+real-time.
+
+For batch processing, this release includes several new features and
+improvements:
+
+* **Streaming Warehouse API:**
+  
[FLIP-282](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235838061)
+  introduces the new Delete and Update API in Flink SQL which works in(only) 
batch
+  mode. External storage systems like Flink Table Store can implement row-level
+  updates via this new API. The ALTER TABLE syntax is enhanced by including the
+  ability to ADD/MODIFY/DROP columns, primary keys, and watermarks, making it
+  easier for users to maintain their table schema.
+* **Batch Execution Improvements:** Execution of batch workloads has been
+  significantly improved in Flink 1.17 in terms of performance, stability and
+  usability. Performance wise, a 26% TPC-DS improvement is achieved with
+  strategy and operator optimizations, such as new join reordering and adaptive
+  local hash aggregation, Hive aggregate functions improvements, and the hybrid
+  shuffle mode enhancements. Stability wise, speculative execution now supports
+  all operators, and the Adaptive Batch Scheduler is more robust against data
+  skew. Usability wise, the tuning effort required for batch workloads has been
+  reduced. The Adaptive Batch Scheduler is now the default scheduler in batch 
mode.
+  The hybrid shuffle is compatible with speculative execution and the Adaptive 
+  Batch Scheduler, next to various configuration simplifications.
+* **SQL Client/Gateway:** Apache Flink 1.17 introduces the "gateway mode" for
+  SQL Client, allowing users to submit SQL queries to a SQL Gateway for 
enhanced
+  functionality. Users can use SQL statements to manage job lifecycles,
+  including displaying job information and stopping running jobs.  This 
provides
+  a powerful tool for managing Flink jobs.
+
+For stream processing, the following features and improvements are realized:
+
+* **Streaming SQL Semantics:** Non-deterministic operations may bring incorrect
+  results or exceptions which is a challenging topic in streaming SQL. 
Incorrect
+  optimization plans and functional issues have been fixed, and the 
experimental
+  feature of 
[PLAN_ADVICE](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/explain/#explaindetails)
+  is introduced to inform of potential correctness risks and optimization
+  suggestions to SQL users.
+* **Checkpoint Improvements:** The generic incremental checkpoint improvements
+  enhance the speed and stability of the checkpoint procedure, and the 
unaligned
+  checkpoint has improved  stability under backpressure and is production-ready
+  in Flink 1.17. Users can manually trigger checkpoints with self-defined
+  checkpoint types while a job is running with the newly introduced REST
+  interface for triggering checkpoints.
+* **Watermark Alignment Enhancement:** Efficient watermark processing directly
+  affects the execution efficiency of event time applications. In Flink 1.17,
+  
[FLIP-217](https://cwiki.apache.org/confluence/display/FLINK/FLIP-217%3A+Support+watermark+alignment+of+source+splits)
+  introduces an improvement to watermark alignment by aligning data emission
+  across splits within a source operator. This 

[jira] [Updated] (FLINK-31486) Using KeySelector in IterationBody causes ClassNotFoundException

2023-03-16 Thread Jiang Xin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xin updated FLINK-31486:
--
Description: 
When we use CoGroup along with KeySelector in an IterationBody, the following 
exception occurs.
{code:java}
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could 
not instantiate state partitioner. at
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
 at 
org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
 at 
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
 at 
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
 at 
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
java.lang.Thread.run(Thread.java:748) 

Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
 of type org.apache.flink.api.java.functions.KeySelector in instance of 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
 at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
 at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
 at 
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
 ... 17 more {code}

  was:
When we use CoGroup along with KeySelector in an IterationBody, the following 
exception occurs.
{code:java}
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could 
not instantiate state partitioner. at 
org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
 at 
org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
 at 
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
 at 
org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
 at 
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
 at 

[GitHub] [flink] ruanhang1993 commented on pull request #21589: [FLINK-25509][connector-base] Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-03-16 Thread via GitHub


ruanhang1993 commented on PR #21589:
URL: https://github.com/apache/flink/pull/21589#issuecomment-1473030088

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31259) Gateway supports initialization of catalog at startup

2023-03-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701465#comment-17701465
 ] 

Jark Wu commented on FLINK-31259:
-

[~lintingbin] yes, I think this is a valuable feature, and I think 
[FLIP-295|https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Introduce+Pluggable+Catalog+Management]
 can address this. (Note that FLIP-295 is still under drafting and I haven't 
reviewed it yet).

> Gateway supports initialization of catalog at startup
> -
>
> Key: FLINK-31259
> URL: https://issues.apache.org/jira/browse/FLINK-31259
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Support to initializing catalogs in gateway when it starts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] JunRuiLee commented on a diff in pull request #21672: [FLINK-30683][runtime] Make adaptive batch scheduler as the default batch scheduler

2023-03-16 Thread via GitHub


JunRuiLee commented on code in PR #21672:
URL: https://github.com/apache/flink/pull/21672#discussion_r1090132553


##
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml:
##
@@ -71,17 +75,21 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY 
$1 ORDER BY $2 NULLS
   

[jira] [Closed] (FLINK-31459) add UPDATE COLUMN POSITION for flink table store

2023-03-16 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-31459.

  Assignee: Jun Zhang
Resolution: Fixed

master: 14d87e4a85e716771c59a5e2fbb840fd1852a0f6

> add UPDATE COLUMN POSITION for flink table store
> 
>
> Key: FLINK-31459
> URL: https://issues.apache.org/jira/browse/FLINK-31459
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Affects Versions: table-store-0.3.1
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] chucheng92 commented on pull request #22192: [BP-1.16][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit

2023-03-16 Thread via GitHub


chucheng92 commented on PR #22192:
URL: https://github.com/apache/flink/pull/22192#issuecomment-1473019407

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] chucheng92 commented on pull request #22192: [BP-1.16][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit

2023-03-16 Thread via GitHub


chucheng92 commented on PR #22192:
URL: https://github.com/apache/flink/pull/22192#issuecomment-1473019274

   BP-1.16 ci failure because 
[FLINK-31477](https://issues.apache.org/jira/browse/FLINK-31477) 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] chucheng92 commented on pull request #22192: [BP-1.16][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit

2023-03-16 Thread via GitHub


chucheng92 commented on PR #22192:
URL: https://github.com/apache/flink/pull/22192#issuecomment-1473018637

   > @chucheng92 please see my comments in the original PR 
[apache/flink-connector-kafka#8](https://github.com/apache/flink-connector-kafka/pull/8)
 before merging this one.
   
   yes, i have added it. thanks @RamanVerma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31319) Kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit

2023-03-16 Thread Ran Tao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701462#comment-17701462
 ] 

Ran Tao commented on FLINK-31319:
-

[~ramanverma] thanks.
BP-1.16: [https://github.com/apache/flink/pull/22192]
BP-1.17: [https://github.com/apache/flink/pull/22193]
original: [https://github.com/apache/flink-connector-kafka/pull/8]

> Kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not 
> quit
> -
>
> Key: FLINK-31319
> URL: https://issues.apache.org/jira/browse/FLINK-31319
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Ran Tao
>Assignee: Ran Tao
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-03-04-01-37-29-360.png, 
> image-2023-03-04-01-39-20-352.png, image-2023-03-04-01-40-44-124.png, 
> image-2023-03-04-01-41-55-664.png
>
>
> As kafka option description, partitionDiscoveryIntervalMs <=0 means disabled.
> !image-2023-03-04-01-37-29-360.png|width=781,height=147!
> just like start kafka enumerator:
> !image-2023-03-04-01-39-20-352.png|width=465,height=311!
> but inner 
> handlePartitionSplitChanges use error if condition( < 0):
> !image-2023-03-04-01-40-44-124.png|width=576,height=237!
>  
> it will cause noMoreNewPartitionSplits can not be set to true. 
> !image-2023-03-04-01-41-55-664.png|width=522,height=610!
> Finally cause bounded source can not signalNoMoreSplits, so it will not quit.
> Besides,Both ends of the if condition should be mutually exclusive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30719) flink-runtime-web failed due to a corrupted

2023-03-16 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701461#comment-17701461
 ] 

Leonard Xu commented on FLINK-30719:


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47149=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5]

> flink-runtime-web failed due to a corrupted 
> 
>
> Key: FLINK-30719
> URL: https://issues.apache.org/jira/browse/FLINK-30719
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend, Test Infrastructure, Tests
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44954=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=12550
> The build failed due to a corrupted nodejs dependency:
> {code}
> [ERROR] The archive file 
> /__w/1/.m2/repository/com/github/eirslett/node/16.13.2/node-16.13.2-linux-x64.tar.gz
>  is corrupted and will be deleted. Please try the build again.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] chucheng92 commented on pull request #22193: [BP-1.17][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit

2023-03-16 Thread via GitHub


chucheng92 commented on PR #22193:
URL: https://github.com/apache/flink/pull/22193#issuecomment-1473015869

   > @chucheng92 please see my comments in the original PR 
[apache/flink-connector-kafka#8](https://github.com/apache/flink-connector-kafka/pull/8)
 before merging this one.
   
   thanks~ @RamanVerma .fixed yet already.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-31477) NestedLoopJoinTest.testLeftOuterJoinWithFilter failed on azure

2023-03-16 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-31477.

Resolution: Fixed

Fixed in release-1.16 : 1bd25a48fa444390971149515810d057324b642b

> NestedLoopJoinTest.testLeftOuterJoinWithFilter failed on azure 
> ---
>
> Key: FLINK-31477
> URL: https://issues.apache.org/jira/browse/FLINK-31477
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: Leonard Xu
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.2
>
>
> {noformat}
>  Failures: 
> Mar 15 15:52:32 [ERROR]   NestedLoopJoinTest.testLeftOuterJoinWithFilter1:37 
> optimized exec plan expected:<...[InnerJoin], where=[[true], select=[a, e, 
> f], build=[left])
> Mar 15 15:52:32:- Exchange(distribution=[broadcast])
> Mar 15 15:52:32:  +- Calc(select=[a], where=[(a = 10)])
> Mar 15 15:52:32: +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, 
> b, c])
> Mar 15 15:52:32+- Calc(select=[e, f], where=[(d = 10])])
> Mar 15 15:52:32   +- LegacyT...> but was:<...[InnerJoin], where=[[(a = 
> d)], select=[a, d, e, f], build=[left])
> Mar 15 15:52:32:- Exchange(distribution=[broadcast])
> Mar 15 15:52:32:  +- Calc(select=[a], where=[SEARCH(a, Sarg[10])])
> Mar 15 15:52:32: +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, 
> b, c])
> Mar 15 15:52:32+- Calc(select=[d, e, f], where=[SEARCH(d, Sarg[10]])])
> Mar 15 15:52:32   +- LegacyT...>{noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47202=logs=086353db-23b2-5446-2315-18e660618ef2=6cd785f3-2a2e-58a8-8e69-b4a03be28843



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] leonardBang merged pull request #22188: [FLINK-31477][table-planner] Fix NestedLoopJoinTest.testLeftOuterJoin failed with unexpected plan

2023-03-16 Thread via GitHub


leonardBang merged PR #22188:
URL: https://github.com/apache/flink/pull/22188


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.

2023-03-16 Thread via GitHub


liuyongvs commented on PR #22144:
URL: https://github.com/apache/flink/pull/22144#issuecomment-1473012377

   @snuyanzin rebase to fix the conflicts and reviews from you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.

2023-03-16 Thread via GitHub


liuyongvs commented on code in PR #22144:
URL: https://github.com/apache/flink/pull/22144#discussion_r1139573794


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##
@@ -1359,6 +1360,16 @@ public OutType arrayDistinct() {
 return toApiSpecificExpression(unresolvedCall(ARRAY_DISTINCT, 
toExpr()));
 }
 
+/**
+ * Remove all elements that equal to element from array.

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.

2023-03-16 Thread via GitHub


liuyongvs commented on code in PR #22144:
URL: https://github.com/apache/flink/pull/22144#discussion_r1139573434


##
docs/data/sql_functions.yml:
##
@@ -617,6 +617,9 @@ collection:
   - sql: ARRAY_DISTINCT(haystack)
 table: haystack.arrayDistinct()
 description: Returns an array with unique elements. If the array itself is 
null, the function will return null. Keeps ordering of elements.
+  - sql: ARRAY_REMOVE(haystack, needle)
+table: haystack.arrayRemove(needle)
+description: Remove all elements that equal to element from array. If the 
array itself is null, the function will return null. Keeps ordering of elements.

Review Comment:
   have fixed



##
flink-python/pyflink/table/expression.py:
##
@@ -1487,6 +1487,13 @@ def array_distinct(self) -> 'Expression':
 """
 return _binary_op("arrayDistinct")(self)
 
+def array_remove(self, needle) -> 'Expression':
+"""
+Remove all elements that equal to element from array.

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] chucheng92 commented on a diff in pull request #8: [FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not qui

2023-03-16 Thread via GitHub


chucheng92 commented on code in PR #8:
URL: 
https://github.com/apache/flink-connector-kafka/pull/8#discussion_r1139572887


##
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##
@@ -166,6 +174,68 @@ public void testReaderRegistrationTriggersAssignments() 
throws Throwable {
 }
 }
 
+@Test
+public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws 
Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+KafkaSourceEnumerator enumerator =
+createEnumerator(context, 
DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+// Start the enumerator and it should schedule a one time task to 
discover and assign
+// partitions.
+enumerator.start();
+
+// Run the partition discover callable and check the partition 
assignment.
+runOneTimePartitionDiscovery(context);
+
+// enumerator noMoreNewPartitionSplits first will be false, when 
execute
+// handlePartitionSplitChanges will be set true
+assertThat((Boolean) Whitebox.getInternalState(enumerator, 
"noMoreNewPartitionSplits"))
+.isTrue();
+}
+}
+
+@Test
+public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() 
throws Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+KafkaSourceEnumerator enumerator =
+createEnumerator(context, 
ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+// Start the enumerator and it should schedule a one time task to 
discover and assign
+// partitions.
+enumerator.start();
+assertThat(context.getOneTimeCallables()).isEmpty();
+assertThat(context.getPeriodicCallables())
+.as("A periodic partition discovery callable should have 
been scheduled")
+.hasSize(1);
+
+// enumerator noMoreNewPartitionSplits first will be false, even 
when execute
+// handlePartitionSplitChanges it still be false
+assertThat((Boolean) Whitebox.getInternalState(enumerator, 
"noMoreNewPartitionSplits"))
+.isFalse();
+}
+}
+
+@Test
+public void 
testRunWithDiscoverPartitionsOnceWithZeroMsToCheckNoMoreSplit() throws 
Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+// set partitionDiscoveryIntervalMs = 0
+KafkaSourceEnumerator enumerator = createEnumerator(context, 
0L)) {
+
+// Start the enumerator, and it should schedule a one time task to 
discover and assign
+// partitions.
+enumerator.start();
+runOneTimePartitionDiscovery(context);

Review Comment:
   Yes. good advice. I have fixed it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] chucheng92 commented on a diff in pull request #8: [FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not qui

2023-03-16 Thread via GitHub


chucheng92 commented on code in PR #8:
URL: 
https://github.com/apache/flink-connector-kafka/pull/8#discussion_r1139572140


##
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##
@@ -166,6 +174,68 @@ public void testReaderRegistrationTriggersAssignments() 
throws Throwable {
 }
 }
 
+@Test
+public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws 
Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+KafkaSourceEnumerator enumerator =
+createEnumerator(context, 
DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+// Start the enumerator and it should schedule a one time task to 
discover and assign
+// partitions.
+enumerator.start();
+
+// Run the partition discover callable and check the partition 
assignment.
+runOneTimePartitionDiscovery(context);
+
+// enumerator noMoreNewPartitionSplits first will be false, when 
execute
+// handlePartitionSplitChanges will be set true
+assertThat((Boolean) Whitebox.getInternalState(enumerator, 
"noMoreNewPartitionSplits"))
+.isTrue();
+}
+}
+
+@Test
+public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() 
throws Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+KafkaSourceEnumerator enumerator =
+createEnumerator(context, 
ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+// Start the enumerator and it should schedule a one time task to 
discover and assign
+// partitions.
+enumerator.start();
+assertThat(context.getOneTimeCallables()).isEmpty();
+assertThat(context.getPeriodicCallables())
+.as("A periodic partition discovery callable should have 
been scheduled")
+.hasSize(1);
+
+// enumerator noMoreNewPartitionSplits first will be false, even 
when execute

Review Comment:
   @RamanVerma thanks. i added it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31465) [Flink] Fix shortcode errors in docs

2023-03-16 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-31465.

Fix Version/s: table-store-0.4.0
 Assignee: Ming Li
   Resolution: Fixed

master: dd05a70d0b66fd3bbf1afe0cd1e8362405f024c7

> [Flink] Fix shortcode errors in docs
> 
>
> Key: FLINK-31465
> URL: https://issues.apache.org/jira/browse/FLINK-31465
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Ming Li
>Assignee: Ming Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> When running docs with hugo, I get the following exception:
> {code:java}
> hugo v0.111.3+extended darwin/amd64 BuildDate=unknown
> Error: Error building site: 
> "/xxx/flink-table-store/docs/content/docs/how-to/writing-tables.md:303:1": 
> failed to extract shortcode: shortcode "tabs" must be closed or 
> self-closed{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31321) Yarn-session mode, securityConfiguration supports dynamic configuration

2023-03-16 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-31321.

Fix Version/s: 1.17.0
   Resolution: Fixed

- master (1.18): 6f9bca971f69525f9be7779121706bfb5756089d
- release-1.17: 17ab9fda0bcd2687275ea8af2215b939d644cb07

> Yarn-session mode, securityConfiguration supports dynamic configuration
> ---
>
> Key: FLINK-31321
> URL: https://issues.apache.org/jira/browse/FLINK-31321
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.17.0
>Reporter: felixzh
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> when different tenants submit jobs using the same {_}flink-conf.yaml{_}, the 
> same user is displayed on the Yarn page.
> _SecurityConfiguration_ does not support dynamic configuration. Therefore, 
> the user displayed on the Yarn page is the 
> _security.kerberos.login.principal_ in the {_}flink-conf.yaml{_}.
> FLINK-29435 only fixed CliFrontend class(Corresponds to flink script).
> FlinkYarnSessionCli class(Corresponds to yarn-session.sh script) still exists 
> this question.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31321) Yarn-session mode, securityConfiguration supports dynamic configuration

2023-03-16 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reassigned FLINK-31321:


Assignee: felixzh

> Yarn-session mode, securityConfiguration supports dynamic configuration
> ---
>
> Key: FLINK-31321
> URL: https://issues.apache.org/jira/browse/FLINK-31321
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.17.0
>Reporter: felixzh
>Assignee: felixzh
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> when different tenants submit jobs using the same {_}flink-conf.yaml{_}, the 
> same user is displayed on the Yarn page.
> _SecurityConfiguration_ does not support dynamic configuration. Therefore, 
> the user displayed on the Yarn page is the 
> _security.kerberos.login.principal_ in the {_}flink-conf.yaml{_}.
> FLINK-29435 only fixed CliFrontend class(Corresponds to flink script).
> FlinkYarnSessionCli class(Corresponds to yarn-session.sh script) still exists 
> this question.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31253) Port itcases to Flink 1.15 and 1.14

2023-03-16 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-31253.

Resolution: Fixed

master: 7663615366a16d3481c67aa093a5f95973ae552e

> Port itcases to Flink 1.15 and 1.14
> ---
>
> Key: FLINK-31253
> URL: https://issues.apache.org/jira/browse/FLINK-31253
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Chao Tian
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: table-store-0.4.0
>
>
> At present, only common has tests. We need to copy a part of itcase to 1.14 
> and 1.15 to ensure normal work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong closed pull request #22119: [FLINK-31321][Deployment/YARN] Yarn-session mode, securityConfiguration supports dynamic configuration

2023-03-16 Thread via GitHub


xintongsong closed pull request #22119: [FLINK-31321][Deployment/YARN] 
Yarn-session mode, securityConfiguration supports dynamic configuration
URL: https://github.com/apache/flink/pull/22119


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context

2023-03-16 Thread via GitHub


flinkbot commented on PR #22197:
URL: https://github.com/apache/flink/pull/22197#issuecomment-1472992082

   
   ## CI report:
   
   * cf28dc6603d3041ec88c7e8f137324201916032f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31487) Add targetColumns to DynamicTableSink#Context

2023-03-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31487:
---
Labels: pull-request-available  (was: )

> Add targetColumns to DynamicTableSink#Context
> -
>
> Key: FLINK-31487
> URL: https://issues.apache.org/jira/browse/FLINK-31487
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> FLIP-300: Add targetColumns to DynamicTableSink#Context to solve the null 
> overwrite problem of partial-insert
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240885081



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lincoln-lil closed pull request #22041: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context

2023-03-16 Thread via GitHub


lincoln-lil closed pull request #22041: [FLINK-31487][table-planner] Add 
targetColumns to DynamicTableSink#Context
URL: https://github.com/apache/flink/pull/22041


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lincoln-lil opened a new pull request, #22197: [FLINK-31487][table-planner] Add targetColumns to DynamicTableSink#Context

2023-03-16 Thread via GitHub


lincoln-lil opened a new pull request, #22197:
URL: https://github.com/apache/flink/pull/22197

   ## What is the purpose of the change
   The internal SinkRuntimeProviderContext will support new constructor with 
targetColumns param, this can be used by connectors to recognize the 
user-specified column list.
   
   Note: currently nested columns in column list of an insert/update statement 
is unsupported (as described in 
[FLINK-31301](https://issues.apache.org/jira/browse/FLINK-31301) & 
[FLINK-31344](https://issues.apache.org/jira/browse/FLINK-31344)), so we can 
make this pr support simple columns first and then support nested columns after 
the two issues been fixed.
   
   ## Brief change log
   * add new getTargetColumns to DynamicTableSink#Context
   * add targetColumns info to related relnodes and execnodes
   * add related tests
   
   ## Verifying this change
   * add partial insert releated case include ut & it
   
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
@Public(Evolving): (yes)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (java-docs)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31486) Using KeySelector in IterationBody causes ClassNotFoundException

2023-03-16 Thread Zhipeng Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701452#comment-17701452
 ] 

Zhipeng Zhang commented on FLINK-31486:
---

Is this similar to this one [1]?

 

 [1] https://issues.apache.org/jira/browse/FLINK-31255

> Using KeySelector in IterationBody causes ClassNotFoundException
> 
>
> Key: FLINK-31486
> URL: https://issues.apache.org/jira/browse/FLINK-31486
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> When we use CoGroup along with KeySelector in an IterationBody, the following 
> exception occurs.
> {code:java}
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Could not instantiate state partitioner. at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662)
>  at 
> org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96)
>  at 
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168)
>  at 
> org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146)
>  at 
> org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>  at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1
>  of type org.apache.flink.api.java.functions.KeySelector in instance of 
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector 
> at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302)
>  at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) 
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) 
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>  at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
>  at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659)
>  ... 17 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-22484) Built-in functions for collections

2023-03-16 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701423#comment-17701423
 ] 

Sergey Nuyanzin commented on FLINK-22484:
-

{{MAP_KEYS}}, {{MAP_VALUES}}, {{MAP_FROM_ARRAYS}} merged at 
[e42d7089b24f4e93e980ce724faf00341660bcd6|https://github.com/apache/flink/commit/e42d7089b24f4e93e980ce724faf00341660bcd6]

> Built-in functions for collections
> --
>
> Key: FLINK-22484
> URL: https://issues.apache.org/jira/browse/FLINK-22484
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> There is a number of built-in functions to work with collections are 
> supported by other vendors. After looking at Postgresql, BigQuery, Spark 
> there was selected a list of more or less generic functions for collections 
> (for more details see [1]).
> Feedback for the doc is  welcome
> [1] 
> [https://docs.google.com/document/d/1nS0Faur9CCop4sJoQ2kMQ2XU1hjg1FaiTSQp2RsZKEE/edit?usp=sharing]
> MAP_KEYS
> MAP_VALUES
> MAP_FROM_ARRAYS



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-22484) Built-in functions for collections

2023-03-16 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-22484.
-
Fix Version/s: 1.18.0
   Resolution: Fixed

> Built-in functions for collections
> --
>
> Key: FLINK-22484
> URL: https://issues.apache.org/jira/browse/FLINK-22484
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> There is a number of built-in functions to work with collections are 
> supported by other vendors. After looking at Postgresql, BigQuery, Spark 
> there was selected a list of more or less generic functions for collections 
> (for more details see [1]).
> Feedback for the doc is  welcome
> [1] 
> [https://docs.google.com/document/d/1nS0Faur9CCop4sJoQ2kMQ2XU1hjg1FaiTSQp2RsZKEE/edit?usp=sharing]
> MAP_KEYS
> MAP_VALUES
> MAP_FROM_ARRAYS



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-22484) Built-in functions for collections

2023-03-16 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-22484:
---

Assignee: Sergey Nuyanzin

> Built-in functions for collections
> --
>
> Key: FLINK-22484
> URL: https://issues.apache.org/jira/browse/FLINK-22484
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> There is a number of built-in functions to work with collections are 
> supported by other vendors. After looking at Postgresql, BigQuery, Spark 
> there was selected a list of more or less generic functions for collections 
> (for more details see [1]).
> Feedback for the doc is  welcome
> [1] 
> [https://docs.google.com/document/d/1nS0Faur9CCop4sJoQ2kMQ2XU1hjg1FaiTSQp2RsZKEE/edit?usp=sharing]
> MAP_KEYS
> MAP_VALUES
> MAP_FROM_ARRAYS



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30896) Reduce usage of CatalogViewImpl in planner

2023-03-16 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701422#comment-17701422
 ] 

Sergey Nuyanzin commented on FLINK-30896:
-

Merged as 
[81f77a9d0e7f49a1786eb21e0adb1290306b89c5|https://github.com/apache/flink/commit/81f77a9d0e7f49a1786eb21e0adb1290306b89c5]

> Reduce usage of CatalogViewImpl in planner
> --
>
> Key: FLINK-30896
> URL: https://issues.apache.org/jira/browse/FLINK-30896
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Most of the work was done under 
> https://issues.apache.org/jira/browse/FLINK-21801
>  
> However there are still some usages of {{CatalogViewImpl}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30896) Reduce usage of CatalogViewImpl in planner

2023-03-16 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-30896.
-
Resolution: Fixed

> Reduce usage of CatalogViewImpl in planner
> --
>
> Key: FLINK-30896
> URL: https://issues.apache.org/jira/browse/FLINK-30896
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Most of the work was done under 
> https://issues.apache.org/jira/browse/FLINK-21801
>  
> However there are still some usages of {{CatalogViewImpl}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30896) Reduce usage of CatalogViewImpl in planner

2023-03-16 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-30896:

Fix Version/s: 1.18.0

> Reduce usage of CatalogViewImpl in planner
> --
>
> Key: FLINK-30896
> URL: https://issues.apache.org/jira/browse/FLINK-30896
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Most of the work was done under 
> https://issues.apache.org/jira/browse/FLINK-21801
>  
> However there are still some usages of {{CatalogViewImpl}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30896) Reduce usage of CatalogViewImpl in planner

2023-03-16 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-30896:
---

Assignee: Sergey Nuyanzin

> Reduce usage of CatalogViewImpl in planner
> --
>
> Key: FLINK-30896
> URL: https://issues.apache.org/jira/browse/FLINK-30896
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> Most of the work was done under 
> https://issues.apache.org/jira/browse/FLINK-21801
>  
> However there are still some usages of {{CatalogViewImpl}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin merged pull request #21847: [FLINK-30896][table] Reduce usage of CatalogViewImpl in table-planner

2023-03-16 Thread via GitHub


snuyanzin merged PR #21847:
URL: https://github.com/apache/flink/pull/21847


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin merged pull request #15797: [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRA…

2023-03-16 Thread via GitHub


snuyanzin merged PR #15797:
URL: https://github.com/apache/flink/pull/15797


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.

2023-03-16 Thread via GitHub


snuyanzin commented on code in PR #22144:
URL: https://github.com/apache/flink/pull/22144#discussion_r1139402152


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##
@@ -1359,6 +1360,16 @@ public OutType arrayDistinct() {
 return toApiSpecificExpression(unresolvedCall(ARRAY_DISTINCT, 
toExpr()));
 }
 
+/**
+ * Remove all elements that equal to element from array.

Review Comment:
   ```suggestion
* Removes all elements that equal to element from array.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.

2023-03-16 Thread via GitHub


snuyanzin commented on code in PR #22144:
URL: https://github.com/apache/flink/pull/22144#discussion_r1139401943


##
flink-python/pyflink/table/expression.py:
##
@@ -1487,6 +1487,13 @@ def array_distinct(self) -> 'Expression':
 """
 return _binary_op("arrayDistinct")(self)
 
+def array_remove(self, needle) -> 'Expression':
+"""
+Remove all elements that equal to element from array.

Review Comment:
   ```suggestion
   Removes all elements that equal to element from array.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #22144: [FLINK-31102][table] Add ARRAY_REMOVE function.

2023-03-16 Thread via GitHub


snuyanzin commented on code in PR #22144:
URL: https://github.com/apache/flink/pull/22144#discussion_r1139401795


##
docs/data/sql_functions.yml:
##
@@ -617,6 +617,9 @@ collection:
   - sql: ARRAY_DISTINCT(haystack)
 table: haystack.arrayDistinct()
 description: Returns an array with unique elements. If the array itself is 
null, the function will return null. Keeps ordering of elements.
+  - sql: ARRAY_REMOVE(haystack, needle)
+table: haystack.arrayRemove(needle)
+description: Remove all elements that equal to element from array. If the 
array itself is null, the function will return null. Keeps ordering of elements.

Review Comment:
   Sorry, looks like missed it 
   ```suggestion
   description: Removes all elements that equal to element from array. If 
the array itself is null, the function will return null. Keeps ordering of 
elements.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31493) helm upgrade does not work, because repo path does not follow helm standards

2023-03-16 Thread Emmanuel Leroy (Jira)
Emmanuel Leroy created FLINK-31493:
--

 Summary: helm upgrade does not work, because repo path does not 
follow helm standards
 Key: FLINK-31493
 URL: https://issues.apache.org/jira/browse/FLINK-31493
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Reporter: Emmanuel Leroy


the helm repo for flink-operator is a folder that includes the version, which 
is not following the helm chart repo standards.

In a standard helm repo, the repo URL is the name of the product (without 
version) and then the folder includes the different versions of the chart.

This is an issue because the repo itself needs to be installed every time the 
version is upgraded, as opposed to adding the repo once and then upgrading the 
version.

When attempting to add the latest repo, helm will complain that the repo 
already exists. It is necessary to first remove the repo, and then add the 
updated one.

When trying to upgrade the chart, it doesn't work, because helm expects the 
chart of the previous version to be in the same repo, but it cannot be found in 
the newly added repo.

So the chart needs to be uninstalled, then the new one installed.

The solution is to use a common path for all versions of the chart, and 
maintain a manifest with the various versions (instead of different folders 
with different manifests)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31363) KafkaSink failed to commit transactions under EXACTLY_ONCE semantics

2023-03-16 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-31363:
---

Assignee: Tzu-Li (Gordon) Tai

> KafkaSink failed to commit transactions under EXACTLY_ONCE semantics
> 
>
> Key: FLINK-31363
> URL: https://issues.apache.org/jira/browse/FLINK-31363
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: lightzhao
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-03-08-10-54-51-410.png
>
>
> When KafkaSink starts Exactly once and no data is written to the topic during 
> a checkpoint, the transaction commit exception is triggered, with the 
> following exception.
> [Transiting to fatal error state due to 
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31363) KafkaSink failed to commit transactions under EXACTLY_ONCE semantics

2023-03-16 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701390#comment-17701390
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-31363:
-

[~lightzhao] I'm preparing a fix for this.

I think the correct fix is to *not* add a txn's metadata into the checkpoint if 
there were no data written to the transaction.
i.e. the checkpoints will only reflect metadata of txns that actually have data 
that need to be committed.

This way, creation of recovery producers can always safely set the 
{{transactionStarted}} flag to {{true}}

> KafkaSink failed to commit transactions under EXACTLY_ONCE semantics
> 
>
> Key: FLINK-31363
> URL: https://issues.apache.org/jira/browse/FLINK-31363
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: lightzhao
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-03-08-10-54-51-410.png
>
>
> When KafkaSink starts Exactly once and no data is written to the topic during 
> a checkpoint, the transaction commit exception is triggered, with the 
> following exception.
> [Transiting to fatal error state due to 
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31451) Flink Table Store Ecosystem: Introduce Presto Reader for table store

2023-03-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31451:
---
Labels: pull-request-available  (was: )

> Flink Table Store Ecosystem: Introduce Presto Reader for table store
> 
>
> Key: FLINK-31451
> URL: https://issues.apache.org/jira/browse/FLINK-31451
> Project: Flink
>  Issue Type: New Feature
>  Components: Table Store
>Affects Versions: table-store-0.4.0
>Reporter: Zaihu Shi
>Assignee: Zaihu Shi
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
>  Introduce Presto Reader for table store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31319) Kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit

2023-03-16 Thread Raman Verma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701344#comment-17701344
 ] 

Raman Verma commented on FLINK-31319:
-

[~lemonjing] Can you please link all the back port PRs (1.16 and 1.17) here as 
well for easier tracking

> Kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not 
> quit
> -
>
> Key: FLINK-31319
> URL: https://issues.apache.org/jira/browse/FLINK-31319
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Ran Tao
>Assignee: Ran Tao
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-03-04-01-37-29-360.png, 
> image-2023-03-04-01-39-20-352.png, image-2023-03-04-01-40-44-124.png, 
> image-2023-03-04-01-41-55-664.png
>
>
> As kafka option description, partitionDiscoveryIntervalMs <=0 means disabled.
> !image-2023-03-04-01-37-29-360.png|width=781,height=147!
> just like start kafka enumerator:
> !image-2023-03-04-01-39-20-352.png|width=465,height=311!
> but inner 
> handlePartitionSplitChanges use error if condition( < 0):
> !image-2023-03-04-01-40-44-124.png|width=576,height=237!
>  
> it will cause noMoreNewPartitionSplits can not be set to true. 
> !image-2023-03-04-01-41-55-664.png|width=522,height=610!
> Finally cause bounded source can not signalNoMoreSplits, so it will not quit.
> Besides,Both ends of the if condition should be mutually exclusive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] RamanVerma commented on pull request #22193: [BP-1.17][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit

2023-03-16 Thread via GitHub


RamanVerma commented on PR #22193:
URL: https://github.com/apache/flink/pull/22193#issuecomment-1472611409

   @chucheng92 please see my comments in the original PR 
https://github.com/apache/flink-connector-kafka/pull/8 before merging this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RamanVerma commented on pull request #22192: [BP-1.16][FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not quit

2023-03-16 Thread via GitHub


RamanVerma commented on PR #22192:
URL: https://github.com/apache/flink/pull/22192#issuecomment-1472610502

   @chucheng92 please see my comments in the original PR 
https://github.com/apache/flink-connector-kafka/pull/8 before merging this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] RamanVerma commented on a diff in pull request #8: [FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs=0 cause bounded source can not qui

2023-03-16 Thread via GitHub


RamanVerma commented on code in PR #8:
URL: 
https://github.com/apache/flink-connector-kafka/pull/8#discussion_r1139238012


##
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##
@@ -166,6 +174,68 @@ public void testReaderRegistrationTriggersAssignments() 
throws Throwable {
 }
 }
 
+@Test
+public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws 
Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+KafkaSourceEnumerator enumerator =
+createEnumerator(context, 
DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+// Start the enumerator and it should schedule a one time task to 
discover and assign
+// partitions.
+enumerator.start();
+
+// Run the partition discover callable and check the partition 
assignment.
+runOneTimePartitionDiscovery(context);
+
+// enumerator noMoreNewPartitionSplits first will be false, when 
execute
+// handlePartitionSplitChanges will be set true
+assertThat((Boolean) Whitebox.getInternalState(enumerator, 
"noMoreNewPartitionSplits"))
+.isTrue();
+}
+}
+
+@Test
+public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() 
throws Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+KafkaSourceEnumerator enumerator =
+createEnumerator(context, 
ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+// Start the enumerator and it should schedule a one time task to 
discover and assign
+// partitions.
+enumerator.start();
+assertThat(context.getOneTimeCallables()).isEmpty();
+assertThat(context.getPeriodicCallables())
+.as("A periodic partition discovery callable should have 
been scheduled")
+.hasSize(1);
+
+// enumerator noMoreNewPartitionSplits first will be false, even 
when execute

Review Comment:
   I think you haven't done partition discovery in this test. You need to call 
`runPeriodicPartitionDiscovery` at least once before check on line 215



##
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##
@@ -166,6 +174,68 @@ public void testReaderRegistrationTriggersAssignments() 
throws Throwable {
 }
 }
 
+@Test
+public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws 
Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+KafkaSourceEnumerator enumerator =
+createEnumerator(context, 
DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+// Start the enumerator and it should schedule a one time task to 
discover and assign
+// partitions.
+enumerator.start();
+
+// Run the partition discover callable and check the partition 
assignment.
+runOneTimePartitionDiscovery(context);
+
+// enumerator noMoreNewPartitionSplits first will be false, when 
execute
+// handlePartitionSplitChanges will be set true
+assertThat((Boolean) Whitebox.getInternalState(enumerator, 
"noMoreNewPartitionSplits"))
+.isTrue();
+}
+}
+
+@Test
+public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() 
throws Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+KafkaSourceEnumerator enumerator =
+createEnumerator(context, 
ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+// Start the enumerator and it should schedule a one time task to 
discover and assign
+// partitions.
+enumerator.start();
+assertThat(context.getOneTimeCallables()).isEmpty();
+assertThat(context.getPeriodicCallables())
+.as("A periodic partition discovery callable should have 
been scheduled")
+.hasSize(1);
+
+// enumerator noMoreNewPartitionSplits first will be false, even 
when execute
+// handlePartitionSplitChanges it still be false
+assertThat((Boolean) Whitebox.getInternalState(enumerator, 
"noMoreNewPartitionSplits"))
+.isFalse();
+}
+}
+
+@Test
+public void 
testRunWithDiscoverPartitionsOnceWithZeroMsToCheckNoMoreSplit() throws 
Throwable {
+try (MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+  

[jira] [Updated] (FLINK-31491) Flink table planner NestedLoopJoinTest.testLeftOuterJoinWithFilter failed in branch release1.16

2023-03-16 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-31491:
---
Affects Version/s: 1.16.1

> Flink table planner NestedLoopJoinTest.testLeftOuterJoinWithFilter failed in 
> branch release1.16
> ---
>
> Key: FLINK-31491
> URL: https://issues.apache.org/jira/browse/FLINK-31491
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: Ran Tao
>Priority: Major
>
> branch release1.16 NestedLoopJoinTest.testLeftOuterJoinWithFilter failed.
> Mar 16 12:30:00 [ERROR] Failures: 
> Mar 16 12:30:00 [ERROR]   NestedLoopJoinTest.testLeftOuterJoinWithFilter1:37 
> optimized exec plan expected:<...[InnerJoin], where=[[true], select=[a, e, 
> f], build=[left])
> Mar 16 12:30:00    :- Exchange(distribution=[broadcast])
> Mar 16 12:30:00    :  +- Calc(select=[a], where=[(a = 10)])
> Mar 16 12:30:00    :     +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, 
> b, c])
> Mar 16 12:30:00    +- Calc(select=[e, f], where=[(d = 10])])
> Mar 16 12:30:00       +- LegacyT...> but was:<...[InnerJoin], where=[[(a = 
> d)], select=[a, d, e, f], build=[left])
> Mar 16 12:30:00    :- Exchange(distribution=[broadcast])
> Mar 16 12:30:00    :  +- Calc(select=[a], where=[SEARCH(a, Sarg[10])])
> Mar 16 12:30:00    :     +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, 
> b, c])
> Mar 16 12:30:00    +- Calc(select=[d, e, f], where=[SEARCH(d, Sarg[10]])])
> Mar 16 12:30:00       +- LegacyT...>
> at 
> org.apache.flink.table.planner.utils.DiffRepository.assertEquals(DiffRepository.java:438)
>     at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertEqualsOrExpand(TableTestBase.scala:1075)
>     at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:1008)
>     at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:849)
>     at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:636)
>     at 
> org.apache.flink.table.planner.plan.batch.sql.join.NestedLoopJoinTest.testLeftOuterJoinWithFilter1(NestedLoopJoinTest.scala:37)
>  
> azure ci: 
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47244=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4]
> And it can be reproduced in local. I think the caused commit may be: 
> f0361c720cb18c4ae7dc669c6a5da5b09bc8f563
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31491) Flink table planner NestedLoopJoinTest.testLeftOuterJoinWithFilter failed in branch release1.16

2023-03-16 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-31491.
--
Resolution: Duplicate

> Flink table planner NestedLoopJoinTest.testLeftOuterJoinWithFilter failed in 
> branch release1.16
> ---
>
> Key: FLINK-31491
> URL: https://issues.apache.org/jira/browse/FLINK-31491
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Ran Tao
>Priority: Major
>
> branch release1.16 NestedLoopJoinTest.testLeftOuterJoinWithFilter failed.
> Mar 16 12:30:00 [ERROR] Failures: 
> Mar 16 12:30:00 [ERROR]   NestedLoopJoinTest.testLeftOuterJoinWithFilter1:37 
> optimized exec plan expected:<...[InnerJoin], where=[[true], select=[a, e, 
> f], build=[left])
> Mar 16 12:30:00    :- Exchange(distribution=[broadcast])
> Mar 16 12:30:00    :  +- Calc(select=[a], where=[(a = 10)])
> Mar 16 12:30:00    :     +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, 
> b, c])
> Mar 16 12:30:00    +- Calc(select=[e, f], where=[(d = 10])])
> Mar 16 12:30:00       +- LegacyT...> but was:<...[InnerJoin], where=[[(a = 
> d)], select=[a, d, e, f], build=[left])
> Mar 16 12:30:00    :- Exchange(distribution=[broadcast])
> Mar 16 12:30:00    :  +- Calc(select=[a], where=[SEARCH(a, Sarg[10])])
> Mar 16 12:30:00    :     +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, 
> b, c])
> Mar 16 12:30:00    +- Calc(select=[d, e, f], where=[SEARCH(d, Sarg[10]])])
> Mar 16 12:30:00       +- LegacyT...>
> at 
> org.apache.flink.table.planner.utils.DiffRepository.assertEquals(DiffRepository.java:438)
>     at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertEqualsOrExpand(TableTestBase.scala:1075)
>     at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:1008)
>     at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:849)
>     at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:636)
>     at 
> org.apache.flink.table.planner.plan.batch.sql.join.NestedLoopJoinTest.testLeftOuterJoinWithFilter1(NestedLoopJoinTest.scala:37)
>  
> azure ci: 
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47244=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4]
> And it can be reproduced in local. I think the caused commit may be: 
> f0361c720cb18c4ae7dc669c6a5da5b09bc8f563
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31492) AWS Firehose Connector misclassifies IAM permission exceptions as retryable

2023-03-16 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701297#comment-17701297
 ] 

Danny Cranmer commented on FLINK-31492:
---

Thanks for reporting this [~samuelsiebenmann] . I have assigned the Jira to you.

> AWS Firehose Connector misclassifies IAM permission exceptions as retryable
> ---
>
> Key: FLINK-31492
> URL: https://issues.apache.org/jira/browse/FLINK-31492
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Connectors / Firehose
>Affects Versions: aws-connector-4.1.0
>Reporter: Samuel Siebenmann
>Priority: Major
>
> The AWS Firehose connector uses an exception classification mechanism to 
> decide if errors writing requests to AWS Firehose are fatal (i.e. 
> non-retryable) or not (i.e. retryable).
> {code:java}
> private boolean isRetryable(Throwable err) {
> if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, 
> getFatalExceptionCons())) {
> return false;
> }
> if (failOnError) {
> getFatalExceptionCons()
> .accept(new 
> KinesisFirehoseException.KinesisFirehoseFailFastException(err));
> return false;
> }
> return true;
> } {code}
> ([github|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L252])
> This exception classification mechanism compares an exception's actual type 
> with known, fatal exception types (by using Flink's 
> [FatalExceptionClassifier.withExceptionClassifier|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/throwable/FatalExceptionClassifier.java#L60]).
>   An exception is considered fatal if it is assignable to a given known fatal 
> exception 
> ([code|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java#L479]).
> The AWS Firehose SDK throws fatal IAM permission exceptions as 
> [FirehoseException|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/FirehoseException.html]s,
>  e.g.
> {code:java}
> software.amazon.awssdk.services.firehose.model.FirehoseException: User: 
> arn:aws:sts:::assumed-role/example-role/kiam-kiam is not 
> authorized to perform: firehose:PutRecordBatch on resource: 
> arn:aws:firehose:us-east-1::deliverystream/example-stream because 
> no identity-based policy allows the firehose:PutRecordBatch action{code}
> At the same time, certain subtypes of FirehoseException are retryable and 
> non-fatal 
> (e.g.[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/LimitExceededException.html]).
> The AWS Firehose connector currently wrongly classifies the fatal IAM 
> permission exception as non-fatal. However, the current exception 
> classification mechanism does not easily handle a case where a super-type 
> should be considered fatal, but its child type shouldn't.
> To address this issue, AWS services and the AWS SDK use error codes (see e.g. 
> [Firehose's error 
> codes|https://docs.aws.amazon.com/firehose/latest/APIReference/CommonErrors.html]
>  or [S3's error 
> codes|https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList],
>  see API docs 
> [here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsErrorDetails.html#errorCode()]
>  and 
> [here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsServiceException.html#awsErrorDetails()])
>  to uniquely identify error conditions and to be used to handle errors by 
> type.
> The AWS Firehose connector (and other AWS connectors) currently log to debug 
> when retrying fully failed records 
> ([code|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L213]).
>  This makes it difficult for users to root cause the above issue without 
> enabling debug logs.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31492) AWS Firehose Connector misclassifies IAM permission exceptions as retryable

2023-03-16 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-31492:
-

Assignee: Samuel Siebenmann

> AWS Firehose Connector misclassifies IAM permission exceptions as retryable
> ---
>
> Key: FLINK-31492
> URL: https://issues.apache.org/jira/browse/FLINK-31492
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Connectors / Firehose
>Affects Versions: aws-connector-4.1.0
>Reporter: Samuel Siebenmann
>Assignee: Samuel Siebenmann
>Priority: Major
>
> The AWS Firehose connector uses an exception classification mechanism to 
> decide if errors writing requests to AWS Firehose are fatal (i.e. 
> non-retryable) or not (i.e. retryable).
> {code:java}
> private boolean isRetryable(Throwable err) {
> if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, 
> getFatalExceptionCons())) {
> return false;
> }
> if (failOnError) {
> getFatalExceptionCons()
> .accept(new 
> KinesisFirehoseException.KinesisFirehoseFailFastException(err));
> return false;
> }
> return true;
> } {code}
> ([github|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L252])
> This exception classification mechanism compares an exception's actual type 
> with known, fatal exception types (by using Flink's 
> [FatalExceptionClassifier.withExceptionClassifier|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/throwable/FatalExceptionClassifier.java#L60]).
>   An exception is considered fatal if it is assignable to a given known fatal 
> exception 
> ([code|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java#L479]).
> The AWS Firehose SDK throws fatal IAM permission exceptions as 
> [FirehoseException|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/FirehoseException.html]s,
>  e.g.
> {code:java}
> software.amazon.awssdk.services.firehose.model.FirehoseException: User: 
> arn:aws:sts:::assumed-role/example-role/kiam-kiam is not 
> authorized to perform: firehose:PutRecordBatch on resource: 
> arn:aws:firehose:us-east-1::deliverystream/example-stream because 
> no identity-based policy allows the firehose:PutRecordBatch action{code}
> At the same time, certain subtypes of FirehoseException are retryable and 
> non-fatal 
> (e.g.[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/LimitExceededException.html]).
> The AWS Firehose connector currently wrongly classifies the fatal IAM 
> permission exception as non-fatal. However, the current exception 
> classification mechanism does not easily handle a case where a super-type 
> should be considered fatal, but its child type shouldn't.
> To address this issue, AWS services and the AWS SDK use error codes (see e.g. 
> [Firehose's error 
> codes|https://docs.aws.amazon.com/firehose/latest/APIReference/CommonErrors.html]
>  or [S3's error 
> codes|https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList],
>  see API docs 
> [here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsErrorDetails.html#errorCode()]
>  and 
> [here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsServiceException.html#awsErrorDetails()])
>  to uniquely identify error conditions and to be used to handle errors by 
> type.
> The AWS Firehose connector (and other AWS connectors) currently log to debug 
> when retrying fully failed records 
> ([code|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L213]).
>  This makes it difficult for users to root cause the above issue without 
> enabling debug logs.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31490) ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten times out

2023-03-16 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701290#comment-17701290
 ] 

Matthias Pohl commented on FLINK-31490:
---

I extracted the test-related logs and attached it to the issue for further 
investigation.

> ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten times 
> out
> 
>
> Key: FLINK-31490
> URL: https://issues.apache.org/jira/browse/FLINK-31490
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
> Attachments: FLINK-31490.mvn.log, FLINK-31490.zookeeper-client.log, 
> FLINK-31490.zookeeper-server.log
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47221=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9448
> {code}
> Mar 16 02:00:54 "main" #1 prio=5 os_prio=0 tid=0x7f488800b800 nid=0x5a15 
> waiting on condition [0x7f488fe14000]
> Mar 16 02:00:54java.lang.Thread.State: WAITING (parking)
> Mar 16 02:00:54   at sun.misc.Unsafe.park(Native Method)
> Mar 16 02:00:54   - parking to wait for  <0xe4065228> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> Mar 16 02:00:54   at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> Mar 16 02:00:54   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> Mar 16 02:00:54   at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> Mar 16 02:00:54   at 
> org.apache.flink.runtime.leaderelection.TestingRetrievalBase.lambda$waitForNewLeader$0(TestingRetrievalBase.java:50)
> Mar 16 02:00:54   at 
> org.apache.flink.runtime.leaderelection.TestingRetrievalBase$$Lambda$1377/797057570.get(Unknown
>  Source)
> Mar 16 02:00:54   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:150)
> Mar 16 02:00:54   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:144)
> Mar 16 02:00:54   at 
> org.apache.flink.runtime.leaderelection.TestingRetrievalBase.waitForNewLeader(TestingRetrievalBase.java:48)
> Mar 16 02:00:54   at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten(ZooKeeperLeaderElectionTest.java:479)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31490) ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten times out

2023-03-16 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-31490:
--
Attachment: FLINK-31490.mvn.log
FLINK-31490.zookeeper-client.log
FLINK-31490.zookeeper-server.log

> ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten times 
> out
> 
>
> Key: FLINK-31490
> URL: https://issues.apache.org/jira/browse/FLINK-31490
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
> Attachments: FLINK-31490.mvn.log, FLINK-31490.zookeeper-client.log, 
> FLINK-31490.zookeeper-server.log
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47221=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9448
> {code}
> Mar 16 02:00:54 "main" #1 prio=5 os_prio=0 tid=0x7f488800b800 nid=0x5a15 
> waiting on condition [0x7f488fe14000]
> Mar 16 02:00:54java.lang.Thread.State: WAITING (parking)
> Mar 16 02:00:54   at sun.misc.Unsafe.park(Native Method)
> Mar 16 02:00:54   - parking to wait for  <0xe4065228> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> Mar 16 02:00:54   at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> Mar 16 02:00:54   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> Mar 16 02:00:54   at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> Mar 16 02:00:54   at 
> org.apache.flink.runtime.leaderelection.TestingRetrievalBase.lambda$waitForNewLeader$0(TestingRetrievalBase.java:50)
> Mar 16 02:00:54   at 
> org.apache.flink.runtime.leaderelection.TestingRetrievalBase$$Lambda$1377/797057570.get(Unknown
>  Source)
> Mar 16 02:00:54   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:150)
> Mar 16 02:00:54   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:144)
> Mar 16 02:00:54   at 
> org.apache.flink.runtime.leaderelection.TestingRetrievalBase.waitForNewLeader(TestingRetrievalBase.java:48)
> Mar 16 02:00:54   at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten(ZooKeeperLeaderElectionTest.java:479)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31492) AWS Firehose Connector misclassifies IAM permission exceptions as retryable

2023-03-16 Thread Samuel Siebenmann (Jira)
Samuel Siebenmann created FLINK-31492:
-

 Summary: AWS Firehose Connector misclassifies IAM permission 
exceptions as retryable
 Key: FLINK-31492
 URL: https://issues.apache.org/jira/browse/FLINK-31492
 Project: Flink
  Issue Type: Bug
  Components: Connectors / AWS, Connectors / Firehose
Affects Versions: aws-connector-4.1.0
Reporter: Samuel Siebenmann


The AWS Firehose connector uses an exception classification mechanism to decide 
if errors writing requests to AWS Firehose are fatal (i.e. non-retryable) or 
not (i.e. retryable).
{code:java}
private boolean isRetryable(Throwable err) {
if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, 
getFatalExceptionCons())) {
return false;
}
if (failOnError) {
getFatalExceptionCons()
.accept(new 
KinesisFirehoseException.KinesisFirehoseFailFastException(err));
return false;
}

return true;
} {code}
([github|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L252])

This exception classification mechanism compares an exception's actual type 
with known, fatal exception types (by using Flink's 
[FatalExceptionClassifier.withExceptionClassifier|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/throwable/FatalExceptionClassifier.java#L60]).
  An exception is considered fatal if it is assignable to a given known fatal 
exception 
([code|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java#L479]).

The AWS Firehose SDK throws fatal IAM permission exceptions as 
[FirehoseException|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/FirehoseException.html]s,
 e.g.
{code:java}
software.amazon.awssdk.services.firehose.model.FirehoseException: User: 
arn:aws:sts:::assumed-role/example-role/kiam-kiam is not authorized 
to perform: firehose:PutRecordBatch on resource: 
arn:aws:firehose:us-east-1::deliverystream/example-stream because 
no identity-based policy allows the firehose:PutRecordBatch action{code}
At the same time, certain subtypes of FirehoseException are retryable and 
non-fatal 
(e.g.[https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/firehose/model/LimitExceededException.html]).

The AWS Firehose connector currently wrongly classifies the fatal IAM 
permission exception as non-fatal. However, the current exception 
classification mechanism does not easily handle a case where a super-type 
should be considered fatal, but its child type shouldn't.

To address this issue, AWS services and the AWS SDK use error codes (see e.g. 
[Firehose's error 
codes|https://docs.aws.amazon.com/firehose/latest/APIReference/CommonErrors.html]
 or [S3's error 
codes|https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList],
 see API docs 
[here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsErrorDetails.html#errorCode()]
 and 
[here|https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/awscore/exception/AwsServiceException.html#awsErrorDetails()])
 to uniquely identify error conditions and to be used to handle errors by type.

The AWS Firehose connector (and other AWS connectors) currently log to debug 
when retrying fully failed records 
([code|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java#L213]).
 This makes it difficult for users to root cause the above issue without 
enabling debug logs.

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   >