[GitHub] [flink] lsyldliu commented on a diff in pull request #20855: [FLINK-29337][hive] Fix fail to query non-hive table in Hive dialect

2022-10-10 Thread GitBox


lsyldliu commented on code in PR #20855:
URL: https://github.com/apache/flink/pull/20855#discussion_r991797046


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java:
##
@@ -1689,4 +1694,61 @@ public static boolean 
isFromTimeStampToDecimal(RelDataType srcType, RelDataType
 return srcType.getSqlTypeName().equals(SqlTypeName.TIMESTAMP)
 && targetType.getSqlTypeName().equals(SqlTypeName.DECIMAL);
 }
+
+/**
+ * Helps to migrate the new {@link Schema} to old API methods. HiveCatalog 
use deprecated {@link
+ * TableSchema}, other catalogs may use the new {@link Schema}. Currently, 
we use it to unify to
+ * {@link TableSchema}. It should be dropped after dropping {@link 
TableSchema}.
+ */
+public static TableSchema fromUnresolvedSchema(Schema schema) {

Review Comment:
   The `TableSchema` has been deprecated, does we can use the new `Schema`? If 
not, I think we should create an issue to migrate `TableSchema` to `Schema` for 
hive.



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java:
##
@@ -520,10 +523,46 @@ public static String 
getUnescapedUnqualifiedTableName(HiveParserASTNode node) {
  * Get dequoted name from a table/column node.
  *
  * @param tableOrColumnNode the table or column node
- * @return for table node, db.tab or tab. for column node column.
+ * @return for table node, return the table that users specific like 
catalog.db.tab, db.tab or
+ * tab. For column node column, return col.
  */
-public static String getUnescapedName(HiveParserASTNode tableOrColumnNode) 
{
-return getUnescapedName(tableOrColumnNode, null);
+public static String getUnescapedName(HiveParserASTNode tableOrColumnNode)
+throws SemanticException {
+return getUnescapedName(tableOrColumnNode, null, null);
+}
+
+public static String getUnescapedName(
+HiveParserASTNode tableOrColumnNode, String currentCatalog, String 
currentDatabase)

Review Comment:
   Nit: currentCatalog and currentDatabase add `@Nullable` annotation.



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java:
##
@@ -1689,4 +1694,61 @@ public static boolean 
isFromTimeStampToDecimal(RelDataType srcType, RelDataType
 return srcType.getSqlTypeName().equals(SqlTypeName.TIMESTAMP)
 && targetType.getSqlTypeName().equals(SqlTypeName.DECIMAL);
 }
+
+/**
+ * Helps to migrate the new {@link Schema} to old API methods. HiveCatalog 
use deprecated {@link
+ * TableSchema}, other catalogs may use the new {@link Schema}. Currently, 
we use it to unify to
+ * {@link TableSchema}. It should be dropped after dropping {@link 
TableSchema}.
+ */
+public static TableSchema fromUnresolvedSchema(Schema schema) {
+final TableSchema.Builder builder = TableSchema.builder();
+
+final DataType unresolvedType = DataTypes.TIMESTAMP(3);
+schema.getColumns().stream()
+.map(
+column -> {
+if (column instanceof 
Schema.UnresolvedPhysicalColumn) {
+final Schema.UnresolvedPhysicalColumn c =
+(Schema.UnresolvedPhysicalColumn) 
column;
+return TableColumn.physical(
+c.getName(), (DataType) 
c.getDataType());
+} else if (column instanceof 
Schema.UnresolvedMetadataColumn) {
+final Schema.UnresolvedMetadataColumn c =
+(Schema.UnresolvedMetadataColumn) 
column;
+return TableColumn.metadata(
+c.getName(),
+(DataType) c.getDataType(),
+c.getMetadataKey(),
+c.isVirtual());
+} else if (column instanceof 
Schema.UnresolvedComputedColumn) {
+final Schema.UnresolvedComputedColumn c =
+(Schema.UnresolvedComputedColumn) 
column;
+return TableColumn.computed(
+c.getName(),
+unresolvedType,

Review Comment:
   Why not use `DataTypes.TIMESTAMP(3)` directly?



##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java:
##
@@ -2158,6 +2074,44 @@ public String getAutogenColAliasPrfxLbl() {
 return this.autogenColAliasPrf

[jira] [Closed] (FLINK-29446) Remove Calcite classes which were fixed in 1.24

2022-10-10 Thread Timo Walther (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-29446.

Fix Version/s: 1.17.0
 Assignee: Sergey Nuyanzin
   Resolution: Fixed

Fixed in master: 90f4239f4af58611ae4922a6b8d032a604cbc650

> Remove Calcite classes which were fixed in 1.24
> ---
>
> Key: FLINK-29446
> URL: https://issues.apache.org/jira/browse/FLINK-29446
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.16.0, 1.15.2
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> {{SqlDotOperator}}, {{SqlItemOperator}}, {{AliasNamespace}} were introduced 
> as copies from Calcite and with a fix inside at 
> https://github.com/apache/flink/pull/12649
> At the same side the fixed was applied in Calcite itself in 1.24. 
> https://issues.apache.org/jira/browse/CALCITE-4085
> Since now Flink depends on 1.26 the fix is included and there is no need to 
> have these classes in Flink repo



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] twalthr closed pull request #20909: [FLINK-29446][Table SQL/API] Remove Calcite classes since they were fixed in 1.24

2022-10-10 Thread GitBox


twalthr closed pull request #20909: [FLINK-29446][Table SQL/API]  Remove 
Calcite classes since they were fixed in 1.24
URL: https://github.com/apache/flink/pull/20909


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoucao commented on pull request #19250: [FLINK-26726][connector][hive]Hive enumerators do not assign splits to unregistered (failed) readers.

2022-10-10 Thread GitBox


zoucao commented on PR #19250:
URL: https://github.com/apache/flink/pull/19250#issuecomment-1274149231

   > 
   
   Sure, I will rebase the master and fix it as soon as possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #19250: [FLINK-26726][connector][hive]Hive enumerators do not assign splits to unregistered (failed) readers.

2022-10-10 Thread GitBox


luoyuxia commented on code in PR #19250:
URL: https://github.com/apache/flink/pull/19250#discussion_r991856228


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumeratorTest.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
+import org.apache.flink.connector.file.table.ContinuousPartitionFetcher;
+import org.apache.flink.connector.file.table.PartitionFetcher;
+import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Unit tests for the {@link ContinuousHiveSplitEnumerator}. */
+public class ContinuousHiveSplitEnumeratorTest {
+
+@Test
+public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
+final TestingSplitEnumeratorContext context =
+new TestingSplitEnumeratorContext<>(4);
+
+final ContinuousHiveSplitEnumerator enumerator =
+new ContinuousHiveSplitEnumerator(
+context,
+0L,
+Collections.emptySet(),
+new SimpleSplitAssigner(Collections.emptyList()),

Review Comment:
   How about using 'new SimpleSplitAssigner(Collections.singletonList(split)),'?



##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumeratorTest.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
+import org.apache.flink.connector.file.table.ContinuousPartitionFetcher;
+import org.apache.flink.connector.file.table.PartitionFetcher;
+import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import s

[jira] [Commented] (FLINK-29126) Fix spliting file optimization doesn't work for orc format

2022-10-10 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615497#comment-17615497
 ] 

godfrey he commented on FLINK-29126:


Fixed in master: cf70844a56a0994dfcd7fb1859408683f2b621a3
Fixed in 1.16: cbc9d462b295243a61ebc544d9cf9ff6fa2a8aa6

> Fix spliting file optimization doesn't work for orc format
> --
>
> Key: FLINK-29126
> URL: https://issues.apache.org/jira/browse/FLINK-29126
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.17.0
>
>
> FLINK-27338 try to improve file spliting for orc format. But it doesn't work 
> for a making  mistake in judge whether the table is stored as orc format or 
> not. We should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] mas-chen commented on a diff in pull request #20343: [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer…

2022-10-10 Thread GitBox


mas-chen commented on code in PR #20343:
URL: https://github.com/apache/flink/pull/20343#discussion_r991847890


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##
@@ -141,6 +142,73 @@ public static Object[] enableObjectReuse() {
 
TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z")))
 };
 
+public static final RowData[] TEST_DATA_WITH_NULL_TIMESTAMP = {
+GenericRowData.ofKind(

Review Comment:
   +1, I believe you can reuse `TEST_DATA` already defined in the class. 
`TEST_DATA` also has many records, but that's a different issue



##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##
@@ -382,4 +543,33 @@ public RowData next() {
 }
 }
 }
+
+private class ReusableIteratorWithNullTimestamp implements 
Iterator {

Review Comment:
   I think the iterator is overkill here. You can even test this functionality 
with one record. Basically you want to confirm if a record with a null 
timestamp can be flushed.



##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##
@@ -297,6 +432,26 @@ public Long timestamp() {
 }
 }
 
+private void writeDataWithNullTimestamp(
+ReducingUpsertWriter writer, Iterator iterator) throws 
Exception {
+while (iterator.hasNext()) {
+RowData next = iterator.next();
+writer.write(

Review Comment:
   You can just call this once and invoke flush() afterwards. There's even no 
need to test the buffering logic as that is captured by other tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-29126) Fix spliting file optimization doesn't work for orc format

2022-10-10 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-29126:
--

Assignee: luoyuxia

> Fix spliting file optimization doesn't work for orc format
> --
>
> Key: FLINK-29126
> URL: https://issues.apache.org/jira/browse/FLINK-29126
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> FLINK-27338 try to improve file spliting for orc format. But it doesn't work 
> for a making  mistake in judge whether the table is stored as orc format or 
> not. We should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29126) Fix spliting file optimization doesn't work for orc format

2022-10-10 Thread godfrey he (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-29126:
---
Fix Version/s: 1.16.0

> Fix spliting file optimization doesn't work for orc format
> --
>
> Key: FLINK-29126
> URL: https://issues.apache.org/jira/browse/FLINK-29126
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.17.0
>
>
> FLINK-27338 try to improve file spliting for orc format. But it doesn't work 
> for a making  mistake in judge whether the table is stored as orc format or 
> not. We should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yuzelin commented on pull request #20931: [FLINK-29486][sql-client] Implement ClientParser for implementing remote SQL client later

2022-10-10 Thread GitBox


yuzelin commented on PR #20931:
URL: https://github.com/apache/flink/pull/20931#issuecomment-1274130610

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] godfreyhe closed pull request #20694: [FLINK-29126][hive] Fix wrong logic for spliting file for orc format

2022-10-10 Thread GitBox


godfreyhe closed pull request #20694: [FLINK-29126][hive] Fix wrong logic for 
spliting file for orc format
URL: https://github.com/apache/flink/pull/20694


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hlteoh37 commented on pull request #21009: [FLINK-29574] Upgrade software.amazon.glue:schema-registry-common and…

2022-10-10 Thread GitBox


hlteoh37 commented on PR #21009:
URL: https://github.com/apache/flink/pull/21009#issuecomment-1274126688

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29432) Replace GenericUDFNvl with GenericUDFCoalesce

2022-10-10 Thread Samrat Deb (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615480#comment-17615480
 ] 

Samrat Deb commented on FLINK-29432:


Thanks [~luoyuxia] for your view on the issue ! 

I completely agree with the fact that there are known bugs with 
`GenericUDFCoalesce` and still haven't reached to complete maturity. We wont go 
ahead with the proposed changes and keep it on hold untill flink start 
supporting hive-4.

> Replace GenericUDFNvl with GenericUDFCoalesce
> -
>
> Key: FLINK-29432
> URL: https://issues.apache.org/jira/browse/FLINK-29432
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.15.2
>Reporter: Prabhu Joseph
>Priority: Major
>  Labels: pull-request-available
>
> Hive NVL() function has many issues like 
> [HIVE-25193|https://issues.apache.org/jira/browse/HIVE-25193] and it is 
> retired [HIVE-20961|https://issues.apache.org/jira/browse/HIVE-20961]. Our 
> internal hive distribution has the fix for HIVE-20961. With this fix, Flink 
> Build is failing with below as there is no more GenericUDFNvl in Hive. This 
> needs to be replaced with GenericUDFCoalesce.
> {code}
> [INFO] 
> /codebuild/output/src366217558/src/build/flink/rpm/BUILD/flink-1.15.2/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserDefaultGraphWalker.java:
>  Recompile with -Xlint:unchecked for details.
> [INFO] -
> [ERROR] COMPILATION ERROR : 
> [INFO] -
> [ERROR] 
> /codebuild/output/src366217558/src/build/flink/rpm/BUILD/flink-1.15.2/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java:[75,45]
>  cannot find symbol
>   symbol:   class GenericUDFNvl
>   location: package org.apache.hadoop.hive.ql.udf.generic
> [ERROR] 
> /codebuild/output/src366217558/src/build/flink/rpm/BUILD/flink-1.15.2/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java:[1216,41]
>  cannot find symbol
>   symbol:   class GenericUDFNvl
>   location: class 
> org.apache.flink.table.planner.delegation.hive.HiveParserTypeCheckProcFactory.DefaultExprProcessor
> [ERROR] 
> /codebuild/output/src366217558/src/build/flink/rpm/BUILD/flink-1.15.2/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java:[231,26]
>  constructor GlobalLimitCtx in class 
> org.apache.hadoop.hive.ql.parse.GlobalLimitCtx cannot be applied to given 
> types;
>   required: org.apache.hadoop.hive.conf.HiveConf
>   found: no arguments
>   reason: actual and formal argument lists differ in length
> [INFO] 3 errors
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26999) Introduce ClickHouse Connector

2022-10-10 Thread RocMarshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615467#comment-17615467
 ] 

RocMarshal commented on FLINK-26999:


[~rmetzger] , The FLIP current content should be deprecated since the new 
connector API has been approved and released when the FLIP was launched. So 
[~martijnvisser]  suggests terminating the FLIP.
We'd better make the  implementation based on the new connector API.

Before updating the FLIP content, I think we need to do two items first:

1. Create an independent repository

2. Collect the commonality problems that need to be solved for the current 
Flink write or sink with clickhouse to design better.


CC [~monster#12] [~subkanthi] 

> Introduce ClickHouse Connector
> --
>
> Key: FLINK-26999
> URL: https://issues.apache.org/jira/browse/FLINK-26999
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common
>Affects Versions: 1.15.0
>Reporter: ZhuoYu Chen
>Priority: Major
>
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-202%3A+Introduce+ClickHouse+Connector]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yuzelin commented on pull request #20931: [FLINK-29486][sql-client] Implement ClientParser for implementing remote SQL client later

2022-10-10 Thread GitBox


yuzelin commented on PR #20931:
URL: https://github.com/apache/flink/pull/20931#issuecomment-1274069957

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

2022-10-10 Thread GitBox


flinkbot commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1274042879

   
   ## CI report:
   
   * 35800afb006d8268985089c7829eba83192a0ace UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-29526) Java doc mistake in SequenceNumberRange#contains()

2022-10-10 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei reassigned FLINK-29526:
--

Assignee: Feifan Wang

> Java doc mistake in SequenceNumberRange#contains()
> --
>
> Key: FLINK-29526
> URL: https://issues.apache.org/jira/browse/FLINK-29526
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Not a Priority
>  Labels: pull-request-available
> Attachments: image-2022-10-06-10-50-16-927.png
>
>
> !image-2022-10-06-10-50-16-927.png|width=554,height=106!
> Hi [~masteryhx] , It seems a typo, I have submit a pr for it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zoltar9264 commented on pull request #20975: [FLINK-29526][state/changelog] fix java doc mistake in SequenceNumber…

2022-10-10 Thread GitBox


zoltar9264 commented on PR #20975:
URL: https://github.com/apache/flink/pull/20975#issuecomment-1274040459

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #20975: [FLINK-29526][state/changelog] fix java doc mistake in SequenceNumber…

2022-10-10 Thread GitBox


zoltar9264 commented on code in PR #20975:
URL: https://github.com/apache/flink/pull/20975#discussion_r991782319


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumberRange.java:
##
@@ -29,7 +29,7 @@ public interface SequenceNumberRange {
 long size();
 
 /**
- * @return true if {@link #from} < sqn < {@link #to} (this implies 
that the range is not
+ * @return true if {@link #from} ≤ sqn < {@link #to} (this implies 
that the range is not
  * empty, i.e. to > from))

Review Comment:
   Thanks @masteryhx , you are right. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29543) Jar Run Rest Handler Support Flink Configuration

2022-10-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29543:
---
Labels: pull-request-available  (was: )

> Jar Run Rest Handler Support Flink Configuration
> 
>
> Key: FLINK-29543
> URL: https://issues.apache.org/jira/browse/FLINK-29543
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Affects Versions: 1.17.0
>Reporter: ConradJam
>Assignee: ConradJam
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Flink JM Rest Api Support Flink Configuration field



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27344) FLIP-222: Support full job lifecycle statements in SQL client

2022-10-10 Thread Paul Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615441#comment-17615441
 ] 

Paul Lin commented on FLINK-27344:
--

[~ekoblov] Sure. It's documented in the FLIP 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-222%3A+Support+full+job+lifecycle+statements+in+SQL+client].
 Also you can find the discussion in the mail list 
[https://lists.apache.org/thread/qkvh9p5w9b12s7ykh3l7lv7m9dbgnf1g].

> FLIP-222: Support full job lifecycle statements in SQL client
> -
>
> Key: FLINK-27344
> URL: https://issues.apache.org/jira/browse/FLINK-27344
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Client
>Reporter: Paul Lin
>Assignee: Paul Lin
>Priority: Major
>
> With the efforts in FLIP-24 and FLIP-91, Flink SQL client supports submitting 
> SQL jobs but lacks further support for their lifecycles afterward which is 
> crucial for streaming use cases. That means Flink SQL client users have to 
> turn to other clients (e.g. CLI) or APIs (e.g. REST API) to manage the jobs, 
> like triggering savepoints or canceling queries, which makes the user 
> experience of SQL client incomplete. 
> Therefore, this proposal aims to complete the capability of SQL client by 
> adding job lifecycle statements. With these statements, users could manage 
> jobs and savepoints through pure SQL in SQL client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] czy006 opened a new pull request, #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

2022-10-10 Thread GitBox


czy006 opened a new pull request, #21012:
URL: https://github.com/apache/flink/pull/21012

   # What is the purpose of the change
   
   Add new feature from FLIP-256 about support jm rest API flinkconfiguration
   
   # Brief change log
   
   - Add JM Rest Api Jar Run Rest Handler Support Flink Configuration
   - This Feature Tests
   
   # Verifying this change
   
   This change added tests and can be verified as follows:
   
   Added integration tests method for in **JarHandlerParameterTest** ,  
testProvideFlinkConfig can test Flink job parameters order.In this test method, 
the external parameter does not set **parallelism.default** , but sets 
**parallelism.default** and **task.cancellation.timeout** in internal 
parameter(flinkconfig field),Finally, through the test, we observed that 
**parallelism.default** did not take effect in the flinkconfig field, because 
the external priority is the highest, and **parallelism.default** is not 
currently configured externally.
   
   # Does this pull request potentially affect one of the following parts:
   Dependencies (does it add or upgrade a dependency): (no)
   The public API, i.e., is any changed class annotated with @Public(Evolving): 
(no)
   The serializers: (no)
   The runtime per-record code paths (performance sensitive): (no)
   Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
   The S3 file system connector: (no)
   
   # Documentation
   Does this pull request introduce a new feature? (yes)
   If yes, how is the feature documented? (documented update will be next pr 
[FLINK-29544](https://issues.apache.org/jira/browse/FLINK-29544)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on pull request #20965: [FLINK-29244][state/changelog] Add metric lastMaterializationDuration…

2022-10-10 Thread GitBox


zoltar9264 commented on PR #20965:
URL: https://github.com/apache/flink/pull/20965#issuecomment-1274037452

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #20965: [FLINK-29244][state/changelog] Add metric lastMaterializationDuration…

2022-10-10 Thread GitBox


zoltar9264 commented on code in PR #20965:
URL: https://github.com/apache/flink/pull/20965#discussion_r991780193


##
docs/content.zh/docs/ops/metrics.md:
##
@@ -1581,6 +1581,11 @@ Note that the metrics are only available via reporters.
   The number of failed materializations.
   Counter
 
+
+  lastMaterializationDuration

Review Comment:
   Thanks @masteryhx , good suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #20965: [FLINK-29244][state/changelog] Add metric lastMaterializationDuration…

2022-10-10 Thread GitBox


zoltar9264 commented on code in PR #20965:
URL: https://github.com/apache/flink/pull/20965#discussion_r991779836


##
flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java:
##
@@ -251,6 +254,8 @@ private void asyncMaterializationPhase(
 
target.handleMaterializationResult(
 snapshotResult, 
materializationID, upTo);
 
metrics.reportCompletedMaterialization();
+
metrics.reportMaterializationDuration(

Review Comment:
   Thanks @masteryhx , I think it make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #20965: [FLINK-29244][state/changelog] Add metric lastMaterializationDuration…

2022-10-10 Thread GitBox


zoltar9264 commented on code in PR #20965:
URL: https://github.com/apache/flink/pull/20965#discussion_r991779398


##
docs/content.zh/docs/ops/metrics.md:
##
@@ -1581,6 +1581,11 @@ Note that the metrics are only available via reporters.
   The number of failed materializations.
   Counter
 
+

Review Comment:
   Thanks @masteryhx , It's my mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21011: [FLINK-29333][TABLE]Support the tableconfig object get configuration infos from flink-conf.yaml file

2022-10-10 Thread GitBox


flinkbot commented on PR #21011:
URL: https://github.com/apache/flink/pull/21011#issuecomment-1274031819

   
   ## CI report:
   
   * 4bc7cb104b31f5c7abd3b52b3b926b0051507b87 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29333) Flink SQL Planner Module doesn't read config from flink-conf.yaml

2022-10-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29333:
---
Labels: pull-request-available  (was: )

> Flink SQL Planner Module doesn't read config from flink-conf.yaml
> -
>
> Key: FLINK-29333
> URL: https://issues.apache.org/jira/browse/FLINK-29333
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Dhananjay Badaya
>Priority: Major
>  Labels: pull-request-available
>
> PlannerModule class doesn't seem to be reading the configs from the 
> {{/etc/flink/conf/flink-conf.yaml}} directory. ([code 
> ref|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java#L95]).
>  It is only reading the default config values defined in the Java code. So, 
> we can't override configs using flink-conf.yaml.
>  
>  
> Use-case: We need to modify the default value of {{io.tmp.dirs}} as the 
> default value (/tmp) is a symlink on our platform , and 
> {{java.nio.file.Files.createDirectory}} (which is used By Flink in this case) 
> doesn't handle symlinks properly 
> [ref|https://bugs.openjdk.org/browse/JDK-8130464].
> {code:java}
> Caused by: java.nio.file.FileAlreadyExistsException: /tmp at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at 
> sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
>  at java.nio.file.Files.createDirectory(Files.java:674) at 
> java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) at 
> java.nio.file.Files.createDirectories(Files.java:727) at 
> org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:96)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hehuiyuan opened a new pull request, #21011: [FLINK-29333][TABLE]Support the tableconfig object get configuration infos from flink-conf.yaml file

2022-10-10 Thread GitBox


hehuiyuan opened a new pull request, #21011:
URL: https://github.com/apache/flink/pull/21011

   
   ## What is the purpose of the change
   
   Support the tableconfig object get configuration infos  from flink-conf.yaml 
file
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (  no)
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
 - The S3 file system connector: ( no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable )
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-27423) Upgrade Hive 3.1 connector from 3.1.2 to 3.1.3

2022-10-10 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia resolved FLINK-27423.
--
Resolution: Fixed

Close it seems it has been fixed. Feel free to open it if not.

> Upgrade Hive 3.1 connector from 3.1.2 to 3.1.3
> --
>
> Key: FLINK-27423
> URL: https://issues.apache.org/jira/browse/FLINK-27423
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Hive
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Jeff Yang
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> The latest supported version of the Hive 3.1.* release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27384) In the Hive dimension table, when the data is changed on the original partition, the create_time configuration does not take effect

2022-10-10 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615432#comment-17615432
 ] 

luoyuxia commented on FLINK-27384:
--

[~leonard] Seems the prs for release-1.4&1.15 branches are ready. Could you 
please help merge it when you're free.

> In the Hive dimension table, when the data is changed on the original 
> partition, the create_time configuration does not take effect
> ---
>
> Key: FLINK-27384
> URL: https://issues.apache.org/jira/browse/FLINK-27384
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.14.4, 1.15.1
>Reporter: 陈磊
>Assignee: 陈磊
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2022-04-25-15-46-01-833.png, 
> image-2022-04-25-15-47-54-213.png
>
>
> In the Hive dimension table, when the data is changed on the original 
> partition, the create_time configuration does not take effect.
> !image-2022-04-25-15-46-01-833.png!
> The current table structure directory is as follows:
> !image-2022-04-25-15-47-54-213.png!
> From the above figure, we can know that when hive is a dimension table, it 
> will load the data of dt=2021-04-22, hr=27.
> If a new partition arrives now, the data of the latest partition can be read 
> smoothly. However, if the data is modified on the original partition, 
> theoretically, the data of the modified partition is read, because of the 
> create_time and latest configuration, but this is not the case in practice. 
> The data that was originally loaded is still read.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-27604) flink sql read hive on hbase throw NPE

2022-10-10 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia resolved FLINK-27604.
--
Resolution: Fixed

> flink sql read hive on hbase throw NPE
> --
>
> Key: FLINK-27604
> URL: https://issues.apache.org/jira/browse/FLINK-27604
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.6
>Reporter: zhangsan
>Priority: Major
>
> I have some table data on hbase, I usually read the hbase data by loading 
> external tables through hive, I want to read the data through flink sql by 
> reading hive tables, when I try with sql-client I get an error. I don't know 
> if there is any way to solve this problem, but I can read the data using the 
> spark engine.
> 
> Environment:
> flink:1.13.6
> hive:2.1.1-cdh6.2.0
> hbase:2.1.0-cdh6.2.0
> flinksql Execution tools:flink sql client 
> sql submit mode:yarn-per-job
> 
> flink lib directory
> antlr-runtime-3.5.2.jar
> flink-csv-1.13.6.jar
> flink-dist_2.11-1.13.6.jar
> flink-json-1.13.6.jar
> flink-shaded-zookeeper-3.4.14.jar
> flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar
> flink-table_2.11-1.13.6.jar
> flink-table-blink_2.11-1.13.6.jar
> guava-14.0.1.jar
> hadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jar
> hbase-client-2.1.0-cdh6.2.0.jar
> hbase-common-2.1.0-cdh6.2.0.jar
> hbase-protocol-2.1.0-cdh6.2.0.jar
> hbase-server-2.1.0-cdh6.2.0.jar
> hive-exec-2.1.1-cdh6.2.0.jar
> hive-hbase-handler-2.1.1-cdh6.2.0.jar
> htrace-core4-4.1.0-incubating.jar
> log4j-1.2-api-2.17.1.jar
> log4j-api-2.17.1.jar
> log4j-core-2.17.1.jar
> log4j-slf4j-impl-2.17.1.jar
> protobuf-java-2.5.0.jar
> 
> step:
> hive create table stament:
> {code:java}
> CREATE EXTERNAL TABLE `ods`.`student`(
>   `row_key` string, 
>   `name` string,
>   `age` int,
>   `addr` string 
> ) 
> ROW FORMAT SERDE 
>   'org.apache.hadoop.hive.hbase.HBaseSerDe' 
> STORED BY 
>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
> WITH SERDEPROPERTIES ( 
>   
> 'hbase.columns.mapping'=':key,FINAL:NAME,FINAL:AGE,FINAL:ADDR,'serialization.format'='1')
> TBLPROPERTIES (
>   'hbase.table.name'='ODS:STUDENT'); {code}
> catalog:hive catalog 
> sql: select * from ods.student;
> 
> error:
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not 
> execute SQL statement.
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:215)
>  ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
>  ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479) 
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412) 
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
>  [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_191]
>     at 
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
>  [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
>  [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
>  [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to 
> instantiate the hadoop input format
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:100)
>  ~[flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:71)
>  ~[flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:212)
>  ~[flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.connectors.hive.HiveParallelismInference.logRunningTime(HiveParallelismInference.java:107)
>  ~[flink-sql-conn

[jira] [Comment Edited] (FLINK-27604) flink sql read hive on hbase throw NPE

2022-10-10 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615429#comment-17615429
 ] 

luoyuxia edited comment on FLINK-27604 at 10/11/22 2:53 AM:


[~18579099...@163.com] Thanks for reporting it. Currently, it's not supported 
to hbase data via Hive in Flink.  But I think we may need to support it.


was (Author: luoyuxia):
[~18579099...@163.com] Thanks for reporting it. Currently, it's not supported 
to hbase data via Hive in Flink.  But I think it's a valid requirement. 

> flink sql read hive on hbase throw NPE
> --
>
> Key: FLINK-27604
> URL: https://issues.apache.org/jira/browse/FLINK-27604
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.6
>Reporter: zhangsan
>Priority: Major
>
> I have some table data on hbase, I usually read the hbase data by loading 
> external tables through hive, I want to read the data through flink sql by 
> reading hive tables, when I try with sql-client I get an error. I don't know 
> if there is any way to solve this problem, but I can read the data using the 
> spark engine.
> 
> Environment:
> flink:1.13.6
> hive:2.1.1-cdh6.2.0
> hbase:2.1.0-cdh6.2.0
> flinksql Execution tools:flink sql client 
> sql submit mode:yarn-per-job
> 
> flink lib directory
> antlr-runtime-3.5.2.jar
> flink-csv-1.13.6.jar
> flink-dist_2.11-1.13.6.jar
> flink-json-1.13.6.jar
> flink-shaded-zookeeper-3.4.14.jar
> flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar
> flink-table_2.11-1.13.6.jar
> flink-table-blink_2.11-1.13.6.jar
> guava-14.0.1.jar
> hadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jar
> hbase-client-2.1.0-cdh6.2.0.jar
> hbase-common-2.1.0-cdh6.2.0.jar
> hbase-protocol-2.1.0-cdh6.2.0.jar
> hbase-server-2.1.0-cdh6.2.0.jar
> hive-exec-2.1.1-cdh6.2.0.jar
> hive-hbase-handler-2.1.1-cdh6.2.0.jar
> htrace-core4-4.1.0-incubating.jar
> log4j-1.2-api-2.17.1.jar
> log4j-api-2.17.1.jar
> log4j-core-2.17.1.jar
> log4j-slf4j-impl-2.17.1.jar
> protobuf-java-2.5.0.jar
> 
> step:
> hive create table stament:
> {code:java}
> CREATE EXTERNAL TABLE `ods`.`student`(
>   `row_key` string, 
>   `name` string,
>   `age` int,
>   `addr` string 
> ) 
> ROW FORMAT SERDE 
>   'org.apache.hadoop.hive.hbase.HBaseSerDe' 
> STORED BY 
>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
> WITH SERDEPROPERTIES ( 
>   
> 'hbase.columns.mapping'=':key,FINAL:NAME,FINAL:AGE,FINAL:ADDR,'serialization.format'='1')
> TBLPROPERTIES (
>   'hbase.table.name'='ODS:STUDENT'); {code}
> catalog:hive catalog 
> sql: select * from ods.student;
> 
> error:
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not 
> execute SQL statement.
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:215)
>  ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
>  ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479) 
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412) 
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
>  [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_191]
>     at 
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
>  [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
>  [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
>  [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to 
> instantiate the hadoop input format
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:100)
>  ~[flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.create

[jira] [Commented] (FLINK-27604) flink sql read hive on hbase throw NPE

2022-10-10 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615429#comment-17615429
 ] 

luoyuxia commented on FLINK-27604:
--

[~18579099...@163.com] Thanks for reporting it. Currently, it's not supported 
to hbase data via Hive in Flink.  But I think it's a valid requirement. 

> flink sql read hive on hbase throw NPE
> --
>
> Key: FLINK-27604
> URL: https://issues.apache.org/jira/browse/FLINK-27604
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.13.6
>Reporter: zhangsan
>Priority: Major
>
> I have some table data on hbase, I usually read the hbase data by loading 
> external tables through hive, I want to read the data through flink sql by 
> reading hive tables, when I try with sql-client I get an error. I don't know 
> if there is any way to solve this problem, but I can read the data using the 
> spark engine.
> 
> Environment:
> flink:1.13.6
> hive:2.1.1-cdh6.2.0
> hbase:2.1.0-cdh6.2.0
> flinksql Execution tools:flink sql client 
> sql submit mode:yarn-per-job
> 
> flink lib directory
> antlr-runtime-3.5.2.jar
> flink-csv-1.13.6.jar
> flink-dist_2.11-1.13.6.jar
> flink-json-1.13.6.jar
> flink-shaded-zookeeper-3.4.14.jar
> flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar
> flink-table_2.11-1.13.6.jar
> flink-table-blink_2.11-1.13.6.jar
> guava-14.0.1.jar
> hadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jar
> hbase-client-2.1.0-cdh6.2.0.jar
> hbase-common-2.1.0-cdh6.2.0.jar
> hbase-protocol-2.1.0-cdh6.2.0.jar
> hbase-server-2.1.0-cdh6.2.0.jar
> hive-exec-2.1.1-cdh6.2.0.jar
> hive-hbase-handler-2.1.1-cdh6.2.0.jar
> htrace-core4-4.1.0-incubating.jar
> log4j-1.2-api-2.17.1.jar
> log4j-api-2.17.1.jar
> log4j-core-2.17.1.jar
> log4j-slf4j-impl-2.17.1.jar
> protobuf-java-2.5.0.jar
> 
> step:
> hive create table stament:
> {code:java}
> CREATE EXTERNAL TABLE `ods`.`student`(
>   `row_key` string, 
>   `name` string,
>   `age` int,
>   `addr` string 
> ) 
> ROW FORMAT SERDE 
>   'org.apache.hadoop.hive.hbase.HBaseSerDe' 
> STORED BY 
>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
> WITH SERDEPROPERTIES ( 
>   
> 'hbase.columns.mapping'=':key,FINAL:NAME,FINAL:AGE,FINAL:ADDR,'serialization.format'='1')
> TBLPROPERTIES (
>   'hbase.table.name'='ODS:STUDENT'); {code}
> catalog:hive catalog 
> sql: select * from ods.student;
> 
> error:
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not 
> execute SQL statement.
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:215)
>  ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
>  ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479) 
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412) 
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
>  [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_191]
>     at 
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
>  [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
>  [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
>  [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to 
> instantiate the hadoop input format
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:100)
>  ~[flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:71)
>  ~[flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:212)
>  ~[flink-sq

[jira] [Closed] (FLINK-26990) BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime failed

2022-10-10 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang closed FLINK-26990.

Resolution: Cannot Reproduce

> BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime
>  failed
> ---
>
> Key: FLINK-26990
> URL: https://issues.apache.org/jira/browse/FLINK-26990
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Matthias Pohl
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> [This 
> build|https://dev.azure.com/mapohl/flink/_build/results?buildId=914&view=logs&j=f3dc9b18-b77a-55c1-591e-264c46fe44d1&t=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d&l=25899]
>  failed due to unexpected behavior in 
> {{BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime}}:
> {code}
> Apr 01 11:42:06 [ERROR] 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime
>   Time elapsed: 0.11 s  <<< FAILURE!
> Apr 01 11:42:06 java.lang.AssertionError: 
> Apr 01 11:42:06 
> Apr 01 11:42:06 Expected size: 6 but was: 4 in:
> Apr 01 11:42:06 [Record @ (undef) : 
> +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05),
> Apr 01 11:42:06 Record @ (undef) : 
> +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10),
> Apr 01 11:42:06 Record @ (undef) : 
> +I(c1,1,1970-01-01T00:00:05,1970-01-01T00:00:15),
> Apr 01 11:42:06 Record @ (undef) : 
> +I(c1,2,1970-01-01T00:00:10,1970-01-01T00:00:20)]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-27598) Improve the exception message when mixing use Python UDF and Pandas UDF

2022-10-10 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang reassigned FLINK-27598:


Assignee: Xingbo Huang

> Improve the exception message when mixing use Python UDF and Pandas UDF
> ---
>
> Key: FLINK-27598
> URL: https://issues.apache.org/jira/browse/FLINK-27598
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Xingbo Huang
>Priority: Major
>
> For the following job:
> {code}
> import argparse
> from decimal import Decimal
> from pyflink.common import Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes
> from pyflink.table.udf import AggregateFunction, udaf
> class DeduplicatedSum(AggregateFunction):
> def create_accumulator(self):
> return \{int(0), float(0)}
> def get_value(self, accumulator) -> float:
> sum(accumulator.values())
> def accumulate(self, accumulator, k: int, v: float):
> if k not in accumulator:
> accumulator[k] = v
> def retract(self, accumulator, k: int, v: float):
> if k in accumulator:
> del accumulator[k]
> deduplicated_sum = udaf(f=DeduplicatedSum(),
> func_type="pandas",
> result_type=DataTypes.DOUBLE(),
> input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])
> class FirstValue(AggregateFunction):
> def create_accumulator(self):
> return [int(-1), float(0)]
> def get_value(self, accumulator) -> float:
> return accumulator[1]
> def accumulate(self, accumulator, k: int, v: float):
> ck = accumulator[0]
> if ck > k:
> accumulator[0] = k
> accumulator[1] = v
> first_value = udaf(f=FirstValue(),
> result_type=DataTypes.DOUBLE(),
> func_type="pandas",
> input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])
> class LastValue(AggregateFunction):
> def create_accumulator(self):
> return [int(-1), float(0)]
> def get_value(self, accumulator: Row) -> float:
> return accumulator[1]
> def accumulate(self, accumulator: Row, k: int, v: float):
> ck = accumulator[0]
> if ck < k:
> accumulator[0] = k
> accumulator[1] = v
> last_value = udaf(f=LastValue(),
> func_type="pandas",
> result_type=DataTypes.DOUBLE(),
> input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])
> def create_source_table_trades(table_env):
> source = f"""
> CREATE TABLE src_trade (
> `id` VARCHAR
> ,`timestamp` BIGINT
> ,`side` VARCHAR
> ,`price` DOUBLE
> ,`size` DOUBLE
> ,`uniqueId` BIGINT
> ,ts_micro AS `timestamp`
> ,ts_milli AS `timestamp` / 1000
> ,ts AS TO_TIMESTAMP_LTZ(`timestamp` / 1000, 3)
> ,WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
> ) WITH (
> 'connector' = 'datagen')
> """
> table_env.execute_sql(source)
> def create_sink_table(table_env):
> sink = f"""
> CREATE TABLE dst_kline (
> wst TIMESTAMP_LTZ(3)
> ,wet TIMESTAMP_LTZ(3)
> ,otm BIGINT
> ,ot TIMESTAMP_LTZ(3)
> ,ctm BIGINT
> ,ct TIMESTAMP_LTZ(3)
> ,ptm BIGINT
> ,pt TIMESTAMP_LTZ(3)
> ,`open` DOUBLE
> ,`close` DOUBLE
> ,`high` DOUBLE
> ,`low` DOUBLE
> ,`vol` DOUBLE -- total trade volume
> ,`to` DOUBLE -- total turnover value
> ,`rev` INT -- revision, something we might use for versioning
> ,`gap` INT -- if this value is reliable
> ,PRIMARY KEY(wst) NOT ENFORCED
> ) WITH (
> 'connector' = 'print'
> )
> """
> table_env.execute_sql(sink)
> def kafka_src_topic(value):
> if not len(value.split('-')) == 5:
> raise argparse.ArgumentTypeError("{} is not a valid kafka 
> topic".format(value))
> return value
> def interval(value):
> i = []
> prev_num = []
> for character in value:
> if character.isalpha():
> if prev_num:
> num = Decimal(''.join(prev_num))
> if character == 'd':
> i.append(f"'\{num}' DAYS")
> elif character == 'h':
> i.append(f"'\{num}' HOURS")
> elif character == 'm':
> i.append(f"'\{num}' MINUTES")
> elif character == 's':
> i.append(f"'\{num}' SECONDS")
> prev_num = []
> elif character.isnumeric() or character == '.':
> prev_num.append(character)
> return " ".join(i)
> def fetch_arguments_flink_kline():
> import argparse
> parser = argparse.ArgumentParser()
> parser.add_argument('--bootstrap-servers', type=str, required=True)
> parser.add_argument('--src-topic', type=kafka_src_topic)
> parser.add_argument('--consume-mode', type=str, default='group-offsets',
> choices=['group-offsets', 'latest-offset'],
> help='scan.startup.mode for kafka')
> parser.add_argument('--interval', type=str, default='20s',
> help='output interval e.g. 5d4h3m1s, default to 20s')
> parser.add_argument('--force-test', action='store_true')
> parser.add_argument('--consumer-group-hint', type=str, default='1')
> args = parser.parse_args()
> if args.force_test and args.consumer_group_hint == '1':
> parser.error("With --force-test, should not use default '1' for 
> --consumer-group-hint")
> return args
> def main():
> # args = fetch_arguments_flink_kline()
> # parts = args.src_t

[jira] [Closed] (FLINK-27715) Add more Python ML examples

2022-10-10 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang closed FLINK-27715.

Resolution: Fixed

Merged into master via 91e74648bf47d1027ef3362c61456b90fcf89535

> Add more Python ML examples
> ---
>
> Key: FLINK-27715
> URL: https://issues.apache.org/jira/browse/FLINK-27715
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Library / Machine Learning
>Reporter: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28394) Python py36-cython: InvocationError for command install_command.sh fails with exit code 1

2022-10-10 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang closed FLINK-28394.

Resolution: Cannot Reproduce

> Python py36-cython: InvocationError for command install_command.sh fails with 
> exit code 1
> -
>
> Key: FLINK-28394
> URL: https://issues.apache.org/jira/browse/FLINK-28394
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.2
>Reporter: Martijn Visser
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: stale-assigned, test-stability
>
> {code:java}
> Jul 05 03:47:22 Picked up JAVA_TOOL_OPTIONS: -XX:+HeapDumpOnOutOfMemoryError
> Jul 05 03:47:32 Using Python version 3.8.13 (default, Mar 28 2022 11:38:47)
> Jul 05 03:47:32 pip_test_code.py success!
> Jul 05 03:47:32 py38-cython finish: run-test  after 1658.14 seconds
> Jul 05 03:47:32 py38-cython start: run-test-post 
> Jul 05 03:47:32 py38-cython finish: run-test-post  after 0.00 seconds
> Jul 05 03:47:32 ___ summary 
> 
> Jul 05 03:47:32 ERROR:   py36-cython: InvocationError for command 
> /__w/3/s/flink-python/dev/install_command.sh --exists-action w 
> .tox/.tmp/package/1/apache-flink-1.15.dev0.zip (exited with code 1)
> Jul 05 03:47:32   py37-cython: commands succeeded
> Jul 05 03:47:32   py38-cython: commands succeeded
> Jul 05 03:47:32 cleanup 
> /__w/3/s/flink-python/.tox/.tmp/package/1/apache-flink-1.15.dev0.zip
> Jul 05 03:47:33 tox checks... [FAILED]
> Jul 05 03:47:33 Process exited with EXIT CODE: 1.
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37604&view=logs&j=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3&t=85189c57-d8a0-5c9c-b61d-fc05cfac62cf&l=27789



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28526) Fail to lateral join with UDTF from Table with timstamp column

2022-10-10 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang reassigned FLINK-28526:


Assignee: Xingbo Huang

> Fail to lateral join with UDTF from Table with timstamp column
> --
>
> Key: FLINK-28526
> URL: https://issues.apache.org/jira/browse/FLINK-28526
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Xingbo Huang
>Priority: Major
>
> The bug can be reproduced with the following test
> {code:python}
> def test_flink(self):
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env)
> table = t_env.from_descriptor(
> TableDescriptor.for_connector("filesystem")
> .schema(
> Schema.new_builder()
> .column("name", DataTypes.STRING())
> .column("cost", DataTypes.INT())
> .column("distance", DataTypes.INT())
> .column("time", DataTypes.TIMESTAMP(3))
> .watermark("time", "`time` - INTERVAL '60' SECOND")
> .build()
> )
> .format("csv")
> .option("path", "./input.csv")
> .build()
> )
> @udtf(result_types=DataTypes.INT())
> def table_func(row: Row):
> return row.cost + row.distance
> table = table.join_lateral(table_func.alias("cost_times_distance"))
> table.execute().print()
> {code}
> It causes the following exception
> {code:none}
> E   pyflink.util.exceptions.TableException: 
> org.apache.flink.table.api.TableException: Unsupported Python SqlFunction 
> CAST.
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136)
> E at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
> E at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79)
> E at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> E at scala.collection.Iterator.foreach(Iterator.scala:937)
> E at scala.collection.Iterator.foreach$(Iterator.scala:937)
> E at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> E at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> E at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> E at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> E at 
> scala.collection.TraversableLike.map(TraversableLike.scala:233)
> E at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> E at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> E at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78)
> E at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181)
> E at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
> E at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828)
> E at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317)
> E at 
> org.apache.flink.table.api.inte

[jira] [Assigned] (FLINK-28742) Table.to_pandas fails with lit("xxx")

2022-10-10 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang reassigned FLINK-28742:


Assignee: Xingbo Huang

> Table.to_pandas fails with lit("xxx")
> -
>
> Key: FLINK-28742
> URL: https://issues.apache.org/jira/browse/FLINK-28742
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Xingbo Huang
>Priority: Major
>
> Table.to_pandas method throws the following exception when the table contains 
> lit("anyString").
>  
> {code:none}
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
> : java.lang.UnsupportedOperationException: Python vectorized UDF doesn't 
> support logical type CHAR(3) NOT NULL currently.
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:743)
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:617)
>     at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:62)
>     at org.apache.flink.table.types.logical.CharType.accept(CharType.java:148)
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowField(ArrowUtils.java:189)
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils.lambda$toArrowSchema$0(ArrowUtils.java:180)
>     at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowSchema(ArrowUtils.java:181)
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:483)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>     at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>     at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>     at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>     at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>     at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
>  
> The code to reproduce the problem
> {code:python}
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env)
> src_table = t_env.from_data_stream(
> env.from_collection([1, 2], type_info=BasicTypeInfo.INT_TYPE_INFO())
> )
> table = src_table.select(expr.lit("123"))
> # table.execute().print()
> print(table.to_pandas()){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28528) Table.getSchema fails on table with watermark

2022-10-10 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang reassigned FLINK-28528:


Assignee: Xingbo Huang

> Table.getSchema fails on table with watermark
> -
>
> Key: FLINK-28528
> URL: https://issues.apache.org/jira/browse/FLINK-28528
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.1
>Reporter: Xuannan Su
>Assignee: Xingbo Huang
>Priority: Major
>
> The bug can be reproduced with the following test. The test can pass if we 
> use the commented way to define the watermark.
> {code:python}
> def test_flink_2(self):
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env)
> table = t_env.from_descriptor(
> TableDescriptor.for_connector("filesystem")
> .schema(
> Schema.new_builder()
> .column("name", DataTypes.STRING())
> .column("cost", DataTypes.INT())
> .column("distance", DataTypes.INT())
> .column("time", DataTypes.TIMESTAMP(3))
> .watermark("time", expr.col("time") - expr.lit(60).seconds)
> # .watermark("time", "`time` - INTERVAL '60' SECOND")
> .build()
> )
> .format("csv")
> .option("path", "./input.csv")
> .build()
> )
> print(table.get_schema())
> {code}
> It causes the following exception
> {code:none}
> E   pyflink.util.exceptions.TableException: 
> org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is 
> not string serializable. Currently, only expressions that originated from a 
> SQL expression have a well-defined string representation.
> E at 
> org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
> E at 
> org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
> E at 
> java.util.Collections$SingletonList.forEach(Collections.java:4824)
> E at 
> org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
> E at org.apache.flink.table.api.Table.getSchema(Table.java:101)
> E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> E at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> E at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> E at java.lang.reflect.Method.invoke(Method.java:498)
> E at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> E at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> E at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> E at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> E at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> E at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> E at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28786) Cannot run PyFlink 1.16 on MacOS with M1 chip

2022-10-10 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang closed FLINK-28786.

Resolution: Not A Problem

> Cannot run PyFlink 1.16 on MacOS with M1 chip
> -
>
> Key: FLINK-28786
> URL: https://issues.apache.org/jira/browse/FLINK-28786
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Ran Tao
>Priority: Major
>
> I have tested it with 2 m1 machines. i will reproduce the bug 100%.
> 1.m1 machine
> macos bigsur 11.5.1 & jdk8 * & jdk11 & python 3.8 & python 3.9
> 1.m1 machine
> macos monterey 12.1 & jdk8 * & jdk11 & python 3.8 & python 3.9
> reproduce step:
> 1.python -m pip install -r flink-python/dev/dev-requirements.txt
> 2.cd flink-python; python setup.py sdist bdist_wheel; cd 
> apache-flink-libraries; python setup.py sdist; cd ..;
> 3.python -m pip install apache-flink-libraries/dist/*.tar.gz
> 4.python -m pip install dist/*.whl
> when run 
> [word_count.py|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table_api_tutorial/]
>  it will cause
> {code:java}
> :219: RuntimeWarning: 
> apache_beam.coders.coder_impl.StreamCoderImpl size changed, may indicate 
> binary incompatibility. Expected 24 from C header, got 32 from PyObject
> Traceback (most recent call last):
>   File "/Users/chucheng/GitLab/pyflink-demo/table/streaming/word_count.py", 
> line 129, in 
> word_count(known_args.input, known_args.output)
>   File "/Users/chucheng/GitLab/pyflink-demo/table/streaming/word_count.py", 
> line 49, in word_count
> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>   File 
> "/Users/chucheng/venv/lib/python3.8/site-packages/pyflink/table/table_environment.py",
>  line 121, in create
> return TableEnvironment(j_tenv)
>   File 
> "/Users/chucheng/venv/lib/python3.8/site-packages/pyflink/table/table_environment.py",
>  line 100, in __init__
> self._open()
>   File 
> "/Users/chucheng/venv/lib/python3.8/site-packages/pyflink/table/table_environment.py",
>  line 1637, in _open
> startup_loopback_server()
>   File 
> "/Users/chucheng/venv/lib/python3.8/site-packages/pyflink/table/table_environment.py",
>  line 1628, in startup_loopback_server
> from pyflink.fn_execution.beam.beam_worker_pool_service import \
>   File 
> "/Users/chucheng/venv/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_worker_pool_service.py",
>  line 44, in 
> from pyflink.fn_execution.beam import beam_sdk_worker_main  # noqa # 
> pylint: disable=unused-import
>   File 
> "/Users/chucheng/venv/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py",
>  line 21, in 
> import pyflink.fn_execution.beam.beam_operations # noqa # pylint: 
> disable=unused-import
>   File 
> "/Users/chucheng/venv/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations.py",
>  line 27, in 
> from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend, 
> RemoteOperatorStateBackend
>   File 
> "/Users/chucheng/venv/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py",
>  line 33, in 
> from pyflink.fn_execution.beam.beam_coders import FlinkCoder
>   File 
> "/Users/chucheng/venv/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_coders.py",
>  line 27, in 
> from pyflink.fn_execution.beam import beam_coder_impl_fast as 
> beam_coder_impl
>   File "pyflink/fn_execution/beam/beam_coder_impl_fast.pyx", line 1, in init 
> pyflink.fn_execution.beam.beam_coder_impl_fast
> KeyError: '__pyx_vtable__'
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #156: [FLINK-29323] Refine Transformer for VectorAssembler

2022-10-10 Thread GitBox


weibozhao commented on code in PR #156:
URL: https://github.com/apache/flink-ml/pull/156#discussion_r991765238


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##
@@ -47,10 +47,15 @@
 
 /**
  * A Transformer which combines a given list of input columns into a vector 
column. Types of input
- * columns must be either vector or numerical value.
+ * columns must be either vector or numerical types. The elements assembled in 
the same column must
+ * have the same size. If the element is null or has the wrong size, we will 
process this case with
+ * {@link HasHandleInvalid} parameter as follows:
  *
- * The `keep` option of {@link HasHandleInvalid} means that we output bad 
rows with output column
- * set to null.
+ * The `keep` option means that we do not check the vector size, and keep 
all rows.

Review Comment:
   The output value will have a different vector size with the normal ones. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a diff in pull request #20756: [FLINK-27929][python] Drop support for python36

2022-10-10 Thread GitBox


HuangXingBo commented on code in PR #20756:
URL: https://github.com/apache/flink/pull/20756#discussion_r991762362


##
flink-python/setup.py:
##
@@ -316,16 +316,10 @@ def extracted_output_files(base_dir, file_path, 
output_directory):
 'python_full_version >= "3.7" and platform_system != 
"Windows"',
 'httplib2>=0.19.0,<=0.20.4', 
apache_flink_libraries_dependency]
 
-if sys.version_info < (3, 7):
-# python 3.6 upper and lower limit
-install_requires.append('numpy>=1.14.3,<1.20')
-install_requires.append('pandas>=1.0,<1.2.0')
-install_requires.append('pyarrow>=0.15.1,<7.0.0')
-else:
-# python 3.7, 3.8 and 3.9 upper limit and M1 chip lower limit,
-install_requires.append('numpy>=1.21.4,<1.22.0')
-install_requires.append('pandas>=1.3.0,<1.4.0')
-install_requires.append('pyarrow>=5.0.0,<9.0.0')
+# python 3.7, 3.8 and 3.9 upper limit and M1 chip lower limit,

Review Comment:
   since we drop the support python3.6, we can put these install dependencies 
in `install_requires` directly.



##
flink-python/setup.py:
##
@@ -27,12 +27,12 @@
 
 from setuptools import setup, Extension
 
-if sys.version_info < (3, 6):
-print("Python versions prior to 3.6 are not supported for PyFlink.",
+if sys.version_info < (3, 7):
+print("Python versions prior to 3.7 are not supported for PyFlink.",
   file=sys.stderr)
 sys.exit(-1)
-elif sys.version_info.minor == 6:
-warnings.warn("Python version 3.6 won't be supported for PyFlink after 
1.16.")

Review Comment:
   we haven't planned to drop support python3.7



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29572) Flink Task Manager skip loopback interface for resource manager registration

2022-10-10 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615420#comment-17615420
 ] 

Xintong Song commented on FLINK-29572:
--

I think this was introduced by FLINK-24474.

In FLINK-24474, we change the default bind address to the loopback address. 
However, when using loopback, the address reported to JM/RM should be different 
from the bind address. Thus, `ConnectionUtils#findAddressUsingStrategy` should 
return `InetAddress.getLocalHost()` in this case.

[~chesnay], could you confirm this?

> Flink Task Manager skip loopback interface for resource manager registration
> 
>
> Key: FLINK-29572
> URL: https://issues.apache.org/jira/browse/FLINK-29572
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.15.2
> Environment: Flink 1.15.2
> Kubernetes with Istio Proxy
>Reporter: Kevin Li
>Priority: Major
>
> Currently Flink Task Manager use different local interface to bind to connect 
> to Resource Manager. First one is Loopback interface. Normally if Job Manager 
> is running on remote host/container, using loopback interface to connect will 
> fail and it will pick up correct IP address.
> However, if Task Manager is running with some proxy, loopback interface can 
> connect to remote host as well. This will result 127.0.0.1 reported to 
> Resource Manager during registration, even Job Manager/Resource Manager runs 
> on remote host, and problem will happen. For us, only one Task Manager can 
> register in this case.
> I suggest adding configuration to skip Loopback interface check if we know 
> Job/Resource Manager is running on remote host/container.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29572) Flink Task Manager skip loopback interface for resource manager registration

2022-10-10 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-29572:
-
Issue Type: Bug  (was: Improvement)

> Flink Task Manager skip loopback interface for resource manager registration
> 
>
> Key: FLINK-29572
> URL: https://issues.apache.org/jira/browse/FLINK-29572
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.15.2
> Environment: Flink 1.15.2
> Kubernetes with Istio Proxy
>Reporter: Kevin Li
>Priority: Major
>
> Currently Flink Task Manager use different local interface to bind to connect 
> to Resource Manager. First one is Loopback interface. Normally if Job Manager 
> is running on remote host/container, using loopback interface to connect will 
> fail and it will pick up correct IP address.
> However, if Task Manager is running with some proxy, loopback interface can 
> connect to remote host as well. This will result 127.0.0.1 reported to 
> Resource Manager during registration, even Job Manager/Resource Manager runs 
> on remote host, and problem will happen. For us, only one Task Manager can 
> register in this case.
> I suggest adding configuration to skip Loopback interface check if we know 
> Job/Resource Manager is running on remote host/container.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29000) Support python UDF in the SQL Gateway

2022-10-10 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang reassigned FLINK-29000:


Assignee: Xingbo Huang

> Support python UDF in the SQL Gateway
> -
>
> Key: FLINK-29000
> URL: https://issues.apache.org/jira/browse/FLINK-29000
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Table SQL / Gateway
>Affects Versions: 1.17.0
>Reporter: Shengkai Fang
>Assignee: Xingbo Huang
>Priority: Major
>
> Currently Flink SQL Client supports python UDF, the Gateway should also 
> support this feature if the SQL Client is able to submit SQL to the Gateway.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29575) Can't Add Temporary Function USING JAR

2022-10-10 Thread Xingbo Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615418#comment-17615418
 ] 

Xingbo Huang commented on FLINK-29575:
--

This feature is introduced in release-1.16, the version of flink you are using 
is 1.15.1?

> Can't Add Temporary Function USING JAR
> --
>
> Key: FLINK-29575
> URL: https://issues.apache.org/jira/browse/FLINK-29575
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.15.1
>Reporter: Angelo Kastroulis
>Priority: Minor
>
> In the create documentation for Flink SQL it says:
> {code:java}
> CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION 
>   [IF NOT EXISTS] [catalog_name.][db_name.]function_name 
>   AS identifier [LANGUAGE JAVA|SCALA|PYTHON] 
>   [USING JAR '.jar' [, JAR '.jar']* ] 
> {code}
> However, this:
> {code:java}
> CREATE TEMPORARY SYSTEM FUNCTION MyFunc AS
> 'com.ballista.my.classpath.MyFunc' LANGUAGE SCALA 
> USING JAR 'path-to-my.jar'{code}
> Throws this exception:
> {code:java}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "USING" at line 
> 3, column 3.
> Was expecting one of:
>      
>     ";" ... {code}
> This works:
> {code:java}
> ADD JAR 'path-to-my.jar'; 
> CREATE TEMPORARY SYSTEM FUNCTION MyFunc AS    
> com.ballista.my.classpath.MyFunc' LANGUAGE SCALA;{code}
> Is this a bug in the code, or is the documentation perhaps not up to date?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on pull request #20907: Flink 29337 hive3

2022-10-10 Thread GitBox


luoyuxia commented on PR #20907:
URL: https://github.com/apache/flink/pull/20907#issuecomment-1274002208

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-29427) LookupJoinITCase failed with classloader problem

2022-10-10 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren reassigned FLINK-29427:
-

Assignee: Alexander Smirnov

> LookupJoinITCase failed with classloader problem
> 
>
> Key: FLINK-29427
> URL: https://issues.apache.org/jira/browse/FLINK-29427
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Assignee: Alexander Smirnov
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 2022-09-27T02:49:20.9501313Z Sep 27 02:49:20 Caused by: 
> org.codehaus.janino.InternalCompilerException: Compiling 
> "KeyProjection$108341": Trying to access closed classloader. Please check if 
> you store classloaders directly or indirectly in static fields. If the 
> stacktrace suggests that the leak occurs in a third party library and cannot 
> be fixed immediately, you can disable this check with the configuration 
> 'classloader.check-leaked-classloader'.
> 2022-09-27T02:49:20.9502654Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:382)
> 2022-09-27T02:49:20.9503366Z Sep 27 02:49:20  at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> 2022-09-27T02:49:20.9504044Z Sep 27 02:49:20  at 
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> 2022-09-27T02:49:20.9504704Z Sep 27 02:49:20  at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> 2022-09-27T02:49:20.9505341Z Sep 27 02:49:20  at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> 2022-09-27T02:49:20.9505965Z Sep 27 02:49:20  at 
> org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> 2022-09-27T02:49:20.9506584Z Sep 27 02:49:20  at 
> org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> 2022-09-27T02:49:20.9507261Z Sep 27 02:49:20  at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104)
> 2022-09-27T02:49:20.9507883Z Sep 27 02:49:20  ... 30 more
> 2022-09-27T02:49:20.9509266Z Sep 27 02:49:20 Caused by: 
> java.lang.IllegalStateException: Trying to access closed classloader. Please 
> check if you store classloaders directly or indirectly in static fields. If 
> the stacktrace suggests that the leak occurs in a third party library and 
> cannot be fixed immediately, you can disable this check with the 
> configuration 'classloader.check-leaked-classloader'.
> 2022-09-27T02:49:20.9510835Z Sep 27 02:49:20  at 
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:184)
> 2022-09-27T02:49:20.9511760Z Sep 27 02:49:20  at 
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
> 2022-09-27T02:49:20.9512456Z Sep 27 02:49:20  at 
> java.lang.Class.forName0(Native Method)
> 2022-09-27T02:49:20.9513014Z Sep 27 02:49:20  at 
> java.lang.Class.forName(Class.java:348)
> 2022-09-27T02:49:20.9513649Z Sep 27 02:49:20  at 
> org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:89)
> 2022-09-27T02:49:20.9514339Z Sep 27 02:49:20  at 
> org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:312)
> 2022-09-27T02:49:20.9514990Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:8556)
> 2022-09-27T02:49:20.9515659Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6749)
> 2022-09-27T02:49:20.9516337Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594)
> 2022-09-27T02:49:20.9516989Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6573)
> 2022-09-27T02:49:20.9517632Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.access$13900(UnitCompiler.java:215)
> 2022-09-27T02:49:20.9518319Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6481)
> 2022-09-27T02:49:20.9519018Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6476)
> 2022-09-27T02:49:20.9519680Z Sep 27 02:49:20  at 
> org.codehaus.janino.Java$ReferenceType.accept(Java.java:3928)
> 2022-09-27T02:49:20.9520386Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6476)
> 2022-09-27T02:49:20.9521042Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6469)
> 2022-09-27T02:49:20.9521677Z Sep 27 02:49:20  at 
> org.codehaus.janino.Java$ReferenceType.accept(Java.java:3927)
> 2022-09-27T02:49:20.9522299Z Sep 27 02:49:20  at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469)
> 2022-09-27T02:49:2

[jira] [Closed] (FLINK-29462) LookupJoinITCase failed on azure due to classloader leaking

2022-10-10 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang closed FLINK-29462.

Resolution: Duplicate

> LookupJoinITCase failed on azure due to classloader leaking
> ---
>
> Key: FLINK-29462
> URL: https://issues.apache.org/jira/browse/FLINK-29462
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Priority: Critical
>  Labels: test-stability
>
> {code:java}
> 09:53:45,656 [ForkJoinPool.commonPool-worker-8] WARN  
> org.apache.flink.table.runtime.generated.GeneratedClass      [] - Failed to 
> compile split code, falling back to original code
> org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>  ~[flink-table-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
>  ~[flink-table-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>  ~[flink-table-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector.open(GenericRowDataKeySelector.java:50)
>  ~[flink-table-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputSplitCacheLoadTask.(InputSplitCacheLoadTask.java:60)
>  ~[flink-table-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputFormatCacheLoader.createCacheLoadTask(InputFormatCacheLoader.java:135)
>  ~[flink-table-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputFormatCacheLoader.lambda$reloadCache$0(InputFormatCacheLoader.java:84)
>  ~[flink-table-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) 
> [?:1.8.0_292]
>   at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>  [?:1.8.0_292]
>   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
> [?:1.8.0_292]
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> [?:1.8.0_292]
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
> [?:1.8.0_292]
>   at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
> [?:1.8.0_292]
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) 
> [?:1.8.0_292]
>   at 
> org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputFormatCacheLoader.reloadCache(InputFormatCacheLoader.java:85)
>  [flink-table-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader.run(CacheLoader.java:105)
>  [flink-table-runtime-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
>   at 
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
>  [?:1.8.0_292]
>   at 
> java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1632)
>  [?:1.8.0_292]
>   at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
> [?:1.8.0_292]
>   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
> [?:1.8.0_292]
>   at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
> [?:1.8.0_292]
>   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
> [?:1.8.0_292]
> Caused by: 
> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
>  ~[flink-shaded-guava-30.1.1-jre-15.0.jar:30.1.1-jre-15.0]
>   at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
>  ~[flink-shaded-guava-30.1.1-jre-15.0.jar:30.1.1-jre-15.0]
>   at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
>  ~[flink-shaded-guava-30.1.1-jre-15.0.jar:30.1.1-jre-15.0]
>   at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)

[jira] [Commented] (FLINK-29545) kafka consuming stop when trigger first checkpoint

2022-10-10 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615416#comment-17615416
 ] 

xiaogang zhou commented on FLINK-29545:
---

[~martijnvisser] this can also happen in 1.15. 

 

"Source: Custom Source (10/40)#0" Id=67 BLOCKED on java.lang.Object@6f54b364 
owned by "Legacy Source Thread - Source: Custom Source (10/40)#0" Id=81
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
- blocked on java.lang.Object@6f54b364
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at

 

the Legacy Source Thread - Source: Custom Source (10/40)#0 is 
reqeustMemorySegment, and the localBufferPool recycle method need to be called 
by Source: Custom Source (10/40)#0 thread, this can be deadlock when checkpoint 
happen

> kafka consuming stop when trigger first checkpoint
> --
>
> Key: FLINK-29545
> URL: https://issues.apache.org/jira/browse/FLINK-29545
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Network
>Affects Versions: 1.13.3
>Reporter: xiaogang zhou
>Priority: Critical
> Attachments: backpressure 100 busy 0.png, task acknowledge na.png, 
> task dag.png
>
>
> the task dag is like attached file. the task is started to consume from 
> earliest offset, it will stop when the first checkpoint triggers.
>  
> is it normal?, for sink is busy 0 and the second operator has 100 backpressure
>  
> and check the checkpoint summary, we can find some of the sub task is n/a.
> I tried to debug this issue and found in the 
> triggerCheckpointAsync , the 
> triggerCheckpointAsyncInMailbox took  a lot time to call
>  
>  
> looks like this has something to do with 
> logCheckpointProcessingDelay, Has any fix on this issue?
>  
>  
> can anybody help me on this issue?
>  
> thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29419) HybridShuffle.testHybridFullExchangesRestart hangs

2022-10-10 Thread Xingbo Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615413#comment-17615413
 ] 

Xingbo Huang commented on FLINK-29419:
--

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=41837&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7

> HybridShuffle.testHybridFullExchangesRestart hangs
> --
>
> Key: FLINK-29419
> URL: https://issues.apache.org/jira/browse/FLINK-29419
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Huang Xingbo
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> {code:java}
> 2022-09-26T10:56:44.0766792Z Sep 26 10:56:44 "ForkJoinPool-1-worker-25" #27 
> daemon prio=5 os_prio=0 tid=0x7f41a4efa000 nid=0x6d76 waiting on 
> condition [0x7f40ac135000]
> 2022-09-26T10:56:44.0767432Z Sep 26 10:56:44java.lang.Thread.State: 
> WAITING (parking)
> 2022-09-26T10:56:44.0767892Z Sep 26 10:56:44  at sun.misc.Unsafe.park(Native 
> Method)
> 2022-09-26T10:56:44.0768644Z Sep 26 10:56:44  - parking to wait for  
> <0xa0704e18> (a java.util.concurrent.CompletableFuture$Signaller)
> 2022-09-26T10:56:44.0769287Z Sep 26 10:56:44  at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> 2022-09-26T10:56:44.0769949Z Sep 26 10:56:44  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> 2022-09-26T10:56:44.0770623Z Sep 26 10:56:44  at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3313)
> 2022-09-26T10:56:44.0771349Z Sep 26 10:56:44  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> 2022-09-26T10:56:44.0772092Z Sep 26 10:56:44  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2022-09-26T10:56:44.0772777Z Sep 26 10:56:44  at 
> org.apache.flink.test.runtime.JobGraphRunningUtil.execute(JobGraphRunningUtil.java:57)
> 2022-09-26T10:56:44.0773534Z Sep 26 10:56:44  at 
> org.apache.flink.test.runtime.BatchShuffleITCaseBase.executeJob(BatchShuffleITCaseBase.java:115)
> 2022-09-26T10:56:44.0774333Z Sep 26 10:56:44  at 
> org.apache.flink.test.runtime.HybridShuffleITCase.testHybridFullExchangesRestart(HybridShuffleITCase.java:59)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=41343&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29559) Disable Join Reorder for these tables whose statistics are unavailable

2022-10-10 Thread Yunhong Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunhong Zheng updated FLINK-29559:
--
Description: For these tables without table statistics, we need to disable 
join reorder for these tables in batch mode.  (was: For these tables without 
table statistics, we need to disable join reorder for these tables.)

> Disable Join Reorder for these tables whose statistics are unavailable
> --
>
> Key: FLINK-29559
> URL: https://issues.apache.org/jira/browse/FLINK-29559
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Priority: Major
> Fix For: 1.17.0
>
>
> For these tables without table statistics, we need to disable join reorder 
> for these tables in batch mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29559) Disable Join Reorder for these tables whose statistics are unavailable in batch mode

2022-10-10 Thread Yunhong Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunhong Zheng updated FLINK-29559:
--
Summary: Disable Join Reorder for these tables whose statistics are 
unavailable in batch mode  (was: Disable Join Reorder for these tables whose 
statistics are unavailable)

> Disable Join Reorder for these tables whose statistics are unavailable in 
> batch mode
> 
>
> Key: FLINK-29559
> URL: https://issues.apache.org/jira/browse/FLINK-29559
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Priority: Major
> Fix For: 1.17.0
>
>
> For these tables without table statistics, we need to disable join reorder 
> for these tables in batch mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lsyldliu commented on a diff in pull request #20652: [FLINK-22315][table] Support add/modify column/constraint/watermark for ALTER TABLE statement

2022-10-10 Thread GitBox


lsyldliu commented on code in PR #20652:
URL: https://github.com/apache/flink/pull/20652#discussion_r991742960


##
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java:
##
@@ -72,4 +87,14 @@ public Optional getWatermark() {
 public List getConstraints() {
 return constraints;
 }
+
+public List getFullConstraint() {

Review Comment:
   It would be better that return `Optional` directly.



##
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java:
##
@@ -61,6 +67,15 @@ public List getOperandList() {
 watermark);
 }
 
+@Override
+public void validate() throws SqlValidateException {
+List columns = new ArrayList<>();
+for (SqlNode columnPos : columnList) {
+columns.add(((SqlTableColumnPosition) columnPos).getColumn());
+}
+SqlConstraintValidator.validate(constraints, new SqlNodeList(columns, 
SqlParserPos.ZERO));

Review Comment:
   The code in 73 line and 95 line seems to be duplicated, so I think we can 
extract the above method.



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterTableSchemaUtil.java:
##
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlWatermark;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Util to convert {@link SqlAlterTableSchema} with source table to generate 
new {@link Schema}. */
+public class AlterTableSchemaUtil {
+
+private final SqlValidator sqlValidator;
+private final Consumer validateTableConstraint;
+private final Function escapeExpression;
+
+AlterTableSchemaUtil(
+SqlValidator sqlValidator,
+Function escapeExpression,
+Consumer validateTableConstraint) {
+this.sqlValidator = sqlValidator;
+this.validateTableConstraint = validateTableConstraint;
+this.escapeExpression = escapeExpression;
+}
+
+public Schema convertSchema(
+SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable 
originalTable) {
+UnresolvedSchemaBuilder builder =
+new UnresolvedSchemaBuilder(
+originalTable,
+(FlinkTypeFactory) sqlValidator.getTypeFactory(),
+sqlValidator,
+validateTableConstraint,
+escapeExpression);
+AlterSchemaStrategy strategy =
+alterTableSchema instanceof SqlAlterTableAdd

Review Comment:
   Assuming there exist other classes such as `SqlAlterTableDrop`、 
`SqlAlterTableRenameColumn` t

[GitHub] [flink] hehuiyuan closed pull request #20995: [FLINK-29553][table]Support UNIX_TIMESTAMP built-in function in Table API

2022-10-10 Thread GitBox


hehuiyuan closed pull request #20995: [FLINK-29553][table]Support 
UNIX_TIMESTAMP built-in function in Table API
URL: https://github.com/apache/flink/pull/20995


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on pull request #20931: [FLINK-29486][sql-client] Implement ClientParser for implementing remote SQL client later

2022-10-10 Thread GitBox


fsk119 commented on PR #20931:
URL: https://github.com/apache/flink/pull/20931#issuecomment-1273985394

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #20987: [FLINK-29432][Connector/Hive] Remove use of GenericNVL and use Colaese

2022-10-10 Thread GitBox


luoyuxia commented on code in PR #20987:
URL: https://github.com/apache/flink/pull/20987#discussion_r991746653


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java:
##
@@ -1216,7 +1216,7 @@ protected ExprNodeDesc getXpathOrFuncExprNodeDesc(
 // Rewrite CASE into NVL
 desc =
 ExprNodeGenericFuncDesc.newInstance(
-new GenericUDFNvl(),
+new GenericUDFCoalesce(),

Review Comment:
   I'm afriad of changing it may bring other bugs as reported int 
[HIVE-24902](https://issues.apache.org/jira/browse/HIVE-24902).
   Also, I still pefer not to change it as I commented in the jira 
[FLINK-29432](https://issues.apache.org/jira/browse/FLINK-29432)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-29483) flink python udf arrow in thread model bug

2022-10-10 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-29483:
---

Assignee: jackylau

> flink python udf arrow in thread model bug
> --
>
> Key: FLINK-29483
> URL: https://issues.apache.org/jira/browse/FLINK-29483
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.0, 1.15.2
>Reporter: jackylau
>Assignee: jackylau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.3, 1.16.1
>
> Attachments: image-2022-09-30-17-03-05-005.png
>
>
> !image-2022-09-30-17-03-05-005.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29432) Replace GenericUDFNvl with GenericUDFCoalesce

2022-10-10 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615410#comment-17615410
 ] 

luoyuxia commented on FLINK-29432:
--

Thanks for contribution. But to put it briefly, I prefer not to change it 
immediately.

Actally,  HIVE-20961 is a patch of Hive 4.0. Hive 4.0 is not released and Flink 
doesn't provide official support for hive 4. Of course it should be fixed if we 
want to support hive4, but at least, seems we have no such plan in short term. 

Also, we can't just only change replace `GenericUDFNvl` with 
`GenericUDFCoalesce` to fix it for it may bring other bugs to Hive dialect as 
reported in [Hive-24902| https://issues.apache.org/jira/browse/HIVE-24902]. 
Also, I'm doubt it may bring other bugs that have't been found.

 

For your problem, you can change in your flink distribution.

 

> Replace GenericUDFNvl with GenericUDFCoalesce
> -
>
> Key: FLINK-29432
> URL: https://issues.apache.org/jira/browse/FLINK-29432
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.15.2
>Reporter: Prabhu Joseph
>Priority: Major
>  Labels: pull-request-available
>
> Hive NVL() function has many issues like 
> [HIVE-25193|https://issues.apache.org/jira/browse/HIVE-25193] and it is 
> retired [HIVE-20961|https://issues.apache.org/jira/browse/HIVE-20961]. Our 
> internal hive distribution has the fix for HIVE-20961. With this fix, Flink 
> Build is failing with below as there is no more GenericUDFNvl in Hive. This 
> needs to be replaced with GenericUDFCoalesce.
> {code}
> [INFO] 
> /codebuild/output/src366217558/src/build/flink/rpm/BUILD/flink-1.15.2/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserDefaultGraphWalker.java:
>  Recompile with -Xlint:unchecked for details.
> [INFO] -
> [ERROR] COMPILATION ERROR : 
> [INFO] -
> [ERROR] 
> /codebuild/output/src366217558/src/build/flink/rpm/BUILD/flink-1.15.2/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java:[75,45]
>  cannot find symbol
>   symbol:   class GenericUDFNvl
>   location: package org.apache.hadoop.hive.ql.udf.generic
> [ERROR] 
> /codebuild/output/src366217558/src/build/flink/rpm/BUILD/flink-1.15.2/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java:[1216,41]
>  cannot find symbol
>   symbol:   class GenericUDFNvl
>   location: class 
> org.apache.flink.table.planner.delegation.hive.HiveParserTypeCheckProcFactory.DefaultExprProcessor
> [ERROR] 
> /codebuild/output/src366217558/src/build/flink/rpm/BUILD/flink-1.15.2/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSemanticAnalyzer.java:[231,26]
>  constructor GlobalLimitCtx in class 
> org.apache.hadoop.hive.ql.parse.GlobalLimitCtx cannot be applied to given 
> types;
>   required: org.apache.hadoop.hive.conf.HiveConf
>   found: no arguments
>   reason: actual and formal argument lists differ in length
> [INFO] 3 errors
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29575) Can't Add Temporary Function USING JAR

2022-10-10 Thread Angelo Kastroulis (Jira)
Angelo Kastroulis created FLINK-29575:
-

 Summary: Can't Add Temporary Function USING JAR
 Key: FLINK-29575
 URL: https://issues.apache.org/jira/browse/FLINK-29575
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.15.1
Reporter: Angelo Kastroulis


In the create documentation for Flink SQL it says:


{code:java}
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION 
  [IF NOT EXISTS] [catalog_name.][db_name.]function_name 
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON] 
  [USING JAR '.jar' [, JAR '.jar']* ] {code}
However, this:
{code:java}
CREATE TEMPORARY SYSTEM FUNCTION MyFunc AS
'com.ballista.my.classpath.MyFunc' LANGUAGE SCALA 
USING JAR 'path-to-my.jar'{code}
Throws this exception:
{code:java}
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "USING" at line 3, 
column 3.
Was expecting one of:
     
    ";" ... {code}

This works:
{code:java}
ADD JAR 'path-to-my.jar'; 
CREATE TEMPORARY SYSTEM FUNCTION MyFunc AS    
com.ballista.my.classpath.MyFunc' LANGUAGE SCALA;{code}

Is this a bug in the code, or is the documentation perhaps not up to date?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #156: [FLINK-29323] Refine Transformer for VectorAssembler

2022-10-10 Thread GitBox


yunfengzhou-hub commented on code in PR #156:
URL: https://github.com/apache/flink-ml/pull/156#discussion_r991744425


##
flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py:
##
@@ -31,17 +33,38 @@ class _VectorAssemblerParams(
 Params for :class:`VectorAssembler`.
 """
 
+INPUT_SIZES: Param[Tuple[int, ...]] = IntArrayParam(
+"input_sizes",
+"Sizes of the assembling elements.",

Review Comment:
   Let's keep the description the same as that in Java.



##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##
@@ -47,10 +47,15 @@
 
 /**
  * A Transformer which combines a given list of input columns into a vector 
column. Types of input
- * columns must be either vector or numerical value.
+ * columns must be either vector or numerical types. The elements assembled in 
the same column must
+ * have the same size. If the element is null or has the wrong size, we will 
process this case with

Review Comment:
   Let's avoid writing the JavaDoc using the first-person perspective. For 
example, instead of saying "we will process", let's say "the operator deals 
with null values or records with wrong sizes according to the strategy 
specified by the {@link HasHandleInvalid} parameter as follows".



##
docs/content/docs/operators/feature/vectorassembler.md:
##
@@ -44,11 +50,12 @@ Types of input columns must be either vector or numerical 
value.
 
 ### Parameters
 
-| Key   | Default| Type | Required | Description   
 |
-|---||--|--||
-| inputCols | `null` | String[] | yes  | Input column names.   
 |
-| outputCol | `"output"` | String   | no   | Output column name.   
 |
-| handleInvalid | `"error"`  | String   | no   | Strategy to handle 
invalid entries. Supported values: 'error', 'skip', 'keep'. |
+| Key | Default| Type  | Required | Description
|
+|-||---|--||
+| inputCols   | `null` | String[]  | yes  | Input column names.
|
+| outputCol   | `"output"` | String| no   | Output column name.
|
+| inputSizes  | `null` | Integer[] | yes  | Sizes of the 
assembling elements.  |

Review Comment:
   Let's keep the description the same as that in Java.



##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##
@@ -47,10 +47,15 @@
 
 /**
  * A Transformer which combines a given list of input columns into a vector 
column. Types of input
- * columns must be either vector or numerical value.
+ * columns must be either vector or numerical types. The elements assembled in 
the same column must
+ * have the same size. If the element is null or has the wrong size, we will 
process this case with
+ * {@link HasHandleInvalid} parameter as follows:
  *
- * The `keep` option of {@link HasHandleInvalid} means that we output bad 
rows with output column
- * set to null.
+ * The `keep` option means that we do not check the vector size, and keep 
all rows.

Review Comment:
   If the size does not match the expected, what would be the output value of 
the row?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #20999: [FLINK-29408][hive] fix HiveCatalogITCase fail with NPE

2022-10-10 Thread GitBox


luoyuxia commented on PR #20999:
URL: https://github.com/apache/flink/pull/20999#issuecomment-1273971517

   The `load data inpath` statement  in `HiveDialectQueryITCase`  will **move** 
 `/csv/test.csv` to other place, then other tests will get `null` by 
`getClass().getResource("/csv/test.csv")`.
   So I change it to statement `load local data inpath`, which will **copy**  
`/csv/test.csv`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21010: [FLINK-29495][Connector/Pulsar] Refactor FailsOnJava11 class for supporting both JUnit4 and JUnit5.

2022-10-10 Thread GitBox


flinkbot commented on PR #21010:
URL: https://github.com/apache/flink/pull/21010#issuecomment-1273971304

   
   ## CI report:
   
   * 19da765ddf23e66964531ff98b7bb5691d67ae60 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] syhily opened a new pull request, #21010: [FLINK-29495][Connector/Pulsar] Refactor FailsOnJava11 class for supporting both JUnit4 and JUnit5.

2022-10-10 Thread GitBox


syhily opened a new pull request, #21010:
URL: https://github.com/apache/flink/pull/21010

   ## What is the purpose of the change
   
   There are known issues with Pulsar on Java 11, which is why we want to 
disable test runs on Java 11. However, the tests still use the JUnit4 
annotation for disabling a test on Java 11, which doesn't work. This PR changes 
that annotation to the JUnit5 equivalent.
   
   ## Brief change log
   
   Change the `FailsOnJava11` interface to an JUnit5 annotation.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] bzhaoopenstack commented on pull request #20498: [FLINK-28829][k8s] Support prepreparing K8S resources before JM creation

2022-10-10 Thread GitBox


bzhaoopenstack commented on PR #20498:
URL: https://github.com/apache/flink/pull/20498#issuecomment-1273954844

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29545) kafka consuming stop when trigger first checkpoint

2022-10-10 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-29545:
--
Description: 
the task dag is like attached file. the task is started to consume from 
earliest offset, it will stop when the first checkpoint triggers.

 

is it normal?, for sink is busy 0 and the second operator has 100 backpressure

 

and check the checkpoint summary, we can find some of the sub task is n/a.

I tried to debug this issue and found in the 

triggerCheckpointAsync , the 

triggerCheckpointAsyncInMailbox took  a lot time to call

 

 

looks like this has something to do with 

logCheckpointProcessingDelay, Has any fix on this issue?

 

 

can anybody help me on this issue?

 

thanks

  was:
the task dag is like attached file. when the task is started to consume from 
earliest offset, it will stop when the first checkpoint triggers.

 

is it normal?, for sink is busy 0 and the second operator has 100 backpressure

 

and check the checkpoint summary, we can find some of the sub task is n/a.

I tried to debug this issue and found in the 

triggerCheckpointAsync , the 

triggerCheckpointAsyncInMailbox took  a lot time to call

 

 

looks like this has something to do with 

logCheckpointProcessingDelay, Has any fix on this issue?

 

 

can anybody help me on this issue?

 

thanks


> kafka consuming stop when trigger first checkpoint
> --
>
> Key: FLINK-29545
> URL: https://issues.apache.org/jira/browse/FLINK-29545
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Network
>Affects Versions: 1.13.3
>Reporter: xiaogang zhou
>Priority: Critical
> Attachments: backpressure 100 busy 0.png, task acknowledge na.png, 
> task dag.png
>
>
> the task dag is like attached file. the task is started to consume from 
> earliest offset, it will stop when the first checkpoint triggers.
>  
> is it normal?, for sink is busy 0 and the second operator has 100 backpressure
>  
> and check the checkpoint summary, we can find some of the sub task is n/a.
> I tried to debug this issue and found in the 
> triggerCheckpointAsync , the 
> triggerCheckpointAsyncInMailbox took  a lot time to call
>  
>  
> looks like this has something to do with 
> logCheckpointProcessingDelay, Has any fix on this issue?
>  
>  
> can anybody help me on this issue?
>  
> thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-24302) Direct buffer memory leak on Pulsar connector with Java 11

2022-10-10 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615386#comment-17615386
 ] 

Yufan Sheng edited comment on FLINK-24302 at 10/11/22 12:35 AM:


Support flink-connector-pulsar on Java 11 seems harder than expect. Although 
Pulsar will add memory limit on both Consumer and Producer API. The direct 
buffer memory still leaks on Admin API. We have to wait Pulsar finally release 
a fixed client. But there is no ETA for this.

https://github.com/apache/pulsar/issues/17989


was (Author: syhily):
Support flink-connector-pulsar on Java 11 seems harder than expect. Although 
Pulsar will add memory limit on both Consumer and Producer API. The direct 
buffer memory still leaks on Admin API. We have to wait Pulsar finally release 
a fixed client. But there is no ETA for this.

> Direct buffer memory leak on Pulsar connector with Java 11
> --
>
> Key: FLINK-24302
> URL: https://issues.apache.org/jira/browse/FLINK-24302
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.0
>Reporter: Yufan Sheng
>Priority: Major
>  Labels: test-stability
>
> Running the Pulsar connector with multiple split readers on Java 11 could 
> throw {{a java.lang.OutOfMemoryError exception}}.
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException:
>  Could not complete the operation. Number of retries has been exhausted. 
> Failed reason: java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>   at 
> java.base/java.util.concurrent.CompletableFuture$OrApply.tryFire(CompletableFuture.java:1503)
>   ... 42 more
> Caused by: 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException:
>  Could not complete the operation. Number of retries has been exhausted. 
> Failed reason: java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:249)
>   ... 39 more
> Caused by: org.apache.pulsar.shade.io.netty.handler.codec.EncoderException: 
> java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104)
>   at 
> org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>   at 
> org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:303)
>   at 
> org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:132)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>   at 
> org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1020)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:311)
>   at 
> org.apache.pulsar.shade.org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:420)
>   ... 23 more
> {code}
> The reason is that under Java 11, the Netty will allocate memory from the 
> pool of Java Direct Memory and is affected by the MaxDirectMemory limit. 
> Under Java 8, it allocates native memory and is not affected by that setting.
> We have to reduce the direct memory usage by using a newer Pulsar client 
> which has a 

[jira] [Commented] (FLINK-24302) Direct buffer memory leak on Pulsar connector with Java 11

2022-10-10 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615386#comment-17615386
 ] 

Yufan Sheng commented on FLINK-24302:
-

Support flink-connector-pulsar on Java 11 seems harder than expect. Although 
Pulsar will add memory limit on both Consumer and Producer API. The direct 
buffer memory still leaks on Admin API. We have to wait Pulsar finally release 
a fixed client. But there is no ETA for this.

> Direct buffer memory leak on Pulsar connector with Java 11
> --
>
> Key: FLINK-24302
> URL: https://issues.apache.org/jira/browse/FLINK-24302
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.0
>Reporter: Yufan Sheng
>Priority: Major
>  Labels: test-stability
>
> Running the Pulsar connector with multiple split readers on Java 11 could 
> throw {{a java.lang.OutOfMemoryError exception}}.
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException:
>  Could not complete the operation. Number of retries has been exhausted. 
> Failed reason: java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>   at 
> java.base/java.util.concurrent.CompletableFuture$OrApply.tryFire(CompletableFuture.java:1503)
>   ... 42 more
> Caused by: 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException:
>  Could not complete the operation. Number of retries has been exhausted. 
> Failed reason: java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:249)
>   ... 39 more
> Caused by: org.apache.pulsar.shade.io.netty.handler.codec.EncoderException: 
> java.lang.OutOfMemoryError: Direct buffer memory
>   at 
> org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104)
>   at 
> org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>   at 
> org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:303)
>   at 
> org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:132)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>   at 
> org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1020)
>   at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:311)
>   at 
> org.apache.pulsar.shade.org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:420)
>   ... 23 more
> {code}
> The reason is that under Java 11, the Netty will allocate memory from the 
> pool of Java Direct Memory and is affected by the MaxDirectMemory limit. 
> Under Java 8, it allocates native memory and is not affected by that setting.
> We have to reduce the direct memory usage by using a newer Pulsar client 
> which has a memory-limits configuration.
> This issue is addressed on Pulsar, and 
> [PIP-74|https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits]
>  has been created for resolving this issue.
> We should keep this issue open with no resolved versions until Pulsar 
> provides a new client with memory limits.
> h2. Update: 2022/08/04
> The memory limit on consumer API has been released 
> ht

[GitHub] [flink] flinkbot commented on pull request #21009: [FLINK-29574] Upgrade software.amazon.glue:schema-registry-common and…

2022-10-10 Thread GitBox


flinkbot commented on PR #21009:
URL: https://github.com/apache/flink/pull/21009#issuecomment-1273892680

   
   ## CI report:
   
   * f378d869c73dc9253d151971c9292b11400d20eb UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29574) Upgrade software.amazon.glue:schema-registry-common and software.amazon.glue:schema-registry-serde dependency from 1.1.8 to 1.1.14

2022-10-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29574:
---
Labels: pull-request-available  (was: )

> Upgrade software.amazon.glue:schema-registry-common and 
> software.amazon.glue:schema-registry-serde dependency from 1.1.8 to 1.1.14
> --
>
> Key: FLINK-29574
> URL: https://issues.apache.org/jira/browse/FLINK-29574
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Hong Liang Teoh
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> We should update the software.amazon.glue:schema-registry-common and 
> software.amazon.glue:schema-registry-serde dependencies from 1.1.8 to 1.1.14 
> to be up to date with the latest version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hlteoh37 opened a new pull request, #21009: [FLINK-29574] Upgrade software.amazon.glue:schema-registry-common and…

2022-10-10 Thread GitBox


hlteoh37 opened a new pull request, #21009:
URL: https://github.com/apache/flink/pull/21009

   … software.amazon.glue:schema-registry-serde dependency from 1.1.8 to 1.1.14
   
   ## What is the purpose of the change
   
   - Update Glue schema registry dependencies
   
   ## Brief change log
   
- Updated POM file
- Fixed unit test to account for signature change
   
   ## Verifying this change
   - Unit tests/e2e tests
   - Manual regression 
 - `flink-glue-schema-registry-avro-test` :: `mvn clean test 
-Prun-end-to-end-tests` (providing GSR keys)
 - `flink-glue-schema-registry-json-test` :: `mvn clean test 
-Prun-end-to-end-tests` (providing GSR keys)
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29574) Upgrade software.amazon.glue:schema-registry-common and software.amazon.glue:schema-registry-serde dependency from 1.1.8 to 1.1.14

2022-10-10 Thread Hong Liang Teoh (Jira)
Hong Liang Teoh created FLINK-29574:
---

 Summary: Upgrade software.amazon.glue:schema-registry-common and 
software.amazon.glue:schema-registry-serde dependency from 1.1.8 to 1.1.14
 Key: FLINK-29574
 URL: https://issues.apache.org/jira/browse/FLINK-29574
 Project: Flink
  Issue Type: Technical Debt
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Hong Liang Teoh
 Fix For: 1.17.0


We should update the software.amazon.glue:schema-registry-common and 
software.amazon.glue:schema-registry-serde dependencies from 1.1.8 to 1.1.14 to 
be up to date with the latest version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29495) PulsarSinkE2ECase hang

2022-10-10 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615377#comment-17615377
 ] 

Yufan Sheng commented on FLINK-29495:
-

[~martijnvisser] Tks, but how could we know your modification works without 
running this on nightly build?

> PulsarSinkE2ECase hang
> --
>
> Key: FLINK-29495
> URL: https://issues.apache.org/jira/browse/FLINK-29495
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.16.0, 1.17.0, 1.15.2
>Reporter: Xingbo Huang
>Assignee: Yufan Sheng
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> {code:java}
> 2022-10-02T05:53:56.0611489Z "main" #1 prio=5 os_prio=0 cpu=5171.60ms 
> elapsed=9072.82s tid=0x7f9508028000 nid=0x54ef1 waiting on condition  
> [0x7f950f994000]
> 2022-10-02T05:53:56.0612041Zjava.lang.Thread.State: TIMED_WAITING 
> (parking)
> 2022-10-02T05:53:56.0612475Z  at 
> jdk.internal.misc.Unsafe.park(java.base@11.0.16.1/Native Method)
> 2022-10-02T05:53:56.0613302Z  - parking to wait for  <0x87d261f8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 2022-10-02T05:53:56.0613959Z  at 
> java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.16.1/LockSupport.java:234)
> 2022-10-02T05:53:56.0614661Z  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(java.base@11.0.16.1/AbstractQueuedSynchronizer.java:2123)
> 2022-10-02T05:53:56.0615428Z  at 
> org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue.poll(GrowableArrayBlockingQueue.java:203)
> 2022-10-02T05:53:56.0616165Z  at 
> org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.internalReceive(MultiTopicsConsumerImpl.java:370)
> 2022-10-02T05:53:56.0616807Z  at 
> org.apache.pulsar.client.impl.ConsumerBase.receive(ConsumerBase.java:198)
> 2022-10-02T05:53:56.0617486Z  at 
> org.apache.flink.connector.pulsar.testutils.sink.PulsarPartitionDataReader.poll(PulsarPartitionDataReader.java:72)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=41526&view=logs&j=6e8542d7-de38-5a33-4aca-458d6c87066d&t=5846934b-7a4f-545b-e5b0-eb4d8bda32e1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29395) [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard

2022-10-10 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-29395:

Description: 
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ensure that the explicitHashKey 
of every record in the aggregated batch is the same.

When resharding occurs, the aggregated record batch can have records that are 
out of the shard's hash key range. This causes the records to be dropped when 
deaggregating, and can result in this situation, where record batch is not 
empty, but the deaggregated record batch is empty.

The symptom seen is similar to the issue seen in 
https://issues.apache.org/jira/browse/FLINK-20088.

See 
[here|https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md]
 and [here|https://github.com/awslabs/kinesis-aggregation/issues/11] for a more 
detailed explanation

*Replicate*

Get shard information
{code:java}
aws kinesis describe-stream --stream-name 
{
    "StreamDescription": {
        "Shards": [
            ...
            {
                "ShardId": "shardId-0037",
                "ParentShardId": "shardId-0027",
                "HashKeyRange": {
                    "StartingHashKey": 
"272225893536750770770699685945414569164",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                }
...
            },
            {
                "ShardId": "shardId-0038",
                "ParentShardId": "shardId-0034",
                "AdjacentParentShardId": "shardId-0036",
                "HashKeyRange": {
                    "StartingHashKey": 
"204169420152563078078024764459060926873",
                    "EndingHashKey": "272225893536750770770699685945414569163"
                }
...
            }
        ]
...
    }
}{code}
Create an aggregate record with two records, each with explicit hash keys 
belonging to different shards
{code:java}
RecordAggregator aggregator = new RecordAggregator();
String record1 = "RECORD_1";
String record2 = "RECORD_2";
aggregator.addUserRecord("pk", "272225893536750770770699685945414569162", 
record1.getBytes());
aggregator.addUserRecord("pk", "272225893536750770770699685945414569165", 
record2.getBytes());

AmazonKinesis kinesisClient = AmazonKinesisClient.builder()
   .build();
kinesisClient.putRecord(aggregator.clearAndGet().toPutRecordRequest("EFOStreamTest"));
 {code}
Consume from given stream whilst specifying a Timestamp where the only record 
retrieved is the record above.

*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
 

*Solution*

This is fixed by reusing the existing timestamp starting position in this 
condition.

  was:
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated re

[jira] [Updated] (FLINK-29395) [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard

2022-10-10 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-29395:

Description: 
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ensure that the explicitHashKey 
of every record in the aggregated batch is the same.

When resharding occurs, the aggregated record batch can have records that are 
out of the shard's hash key range. This causes the records to be dropped when 
deaggregating, and can result in this situation, where record batch is not 
empty, but the deaggregated record batch is empty.

The symptom seen is similar to the issue seen in 
https://issues.apache.org/jira/browse/FLINK-20088.

See 
[here|https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md]
 and [here|https://github.com/awslabs/kinesis-aggregation/issues/11] for a more 
detailed explanation

*Replicate*

Get shard information
{code:java}
aws kinesis describe-stream --stream-name 
{
    "StreamDescription": {
        "Shards": [
            ...
            {
                "ShardId": "shardId-0037",
                "ParentShardId": "shardId-0027",
                "HashKeyRange": {
                    "StartingHashKey": 
"272225893536750770770699685945414569164",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                }
...
            },
            {
                "ShardId": "shardId-0038",
                "ParentShardId": "shardId-0034",
                "AdjacentParentShardId": "shardId-0036",
                "HashKeyRange": {
                    "StartingHashKey": 
"204169420152563078078024764459060926873",
                    "EndingHashKey": "272225893536750770770699685945414569163"
                }
...
            }
        ]
...
    }
}{code}
Create an aggregate record with two records, each with explicit hash keys 
belonging to different shards
{code:java}
RecordAggregator aggregator = new RecordAggregator();
String record1 = "RECORD_1";
String record2 = "RECORD_2";
aggregator.addUserRecord("pk", "272225893536750770770699685945414569162", 
record1.getBytes());
aggregator.addUserRecord("pk", "272225893536750770770699685945414569165", 
record2.getBytes());

AmazonKinesis kinesisClient = AmazonKinesisClient.builder()
   .build();
kinesisClient.putRecord(aggregator.clearAndGet().toPutRecordRequest("EFOStreamTest"));
 {code}
Consume from given stream whilst specifying a Timestamp where the only record 
retrieved is an aggregate that belongs to multiple shards.

*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
 

*Solution*

This is fixed by reusing the existing timestamp starting position in this 
condition.

  was:
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, bu

[jira] [Updated] (FLINK-29395) [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard

2022-10-10 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-29395:

Description: 
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ensure that the explicitHashKey 
of every record in the aggregated batch is the same.

When resharding occurs, the aggregated record batch can have records that are 
out of the shard's hash key range. This causes the records to be dropped when 
deaggregating, and can result in this situation, where record batch is not 
empty, but the deaggregated record batch is empty.

The symptom seen is similar to the issue seen in 
https://issues.apache.org/jira/browse/FLINK-20088.

See 
[here|https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md]
 and [here|https://github.com/awslabs/kinesis-aggregation/issues/11] for a more 
detailed explanation

*Replicate*

Get shard information
{code:java}
aws kinesis describe-stream --stream-name 
{
    "StreamDescription": {
        "Shards": [
            ...
            {
                "ShardId": "shardId-0037",
                "ParentShardId": "shardId-0027",
                "HashKeyRange": {
                    "StartingHashKey": 
"272225893536750770770699685945414569164",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                }
...
            },
            {
                "ShardId": "shardId-0038",
                "ParentShardId": "shardId-0034",
                "AdjacentParentShardId": "shardId-0036",
                "HashKeyRange": {
                    "StartingHashKey": 
"204169420152563078078024764459060926873",
                    "EndingHashKey": "272225893536750770770699685945414569163"
                }
...
            }
        ]
...
    }
}{code}
Create an aggregate record with two records, each with explicit hash keys 
belonging to different shards
{code:java}
RecordAggregator aggregator = new RecordAggregator();
String record1 = "RECORD_1";
String record2 = "RECORD_2";
aggregator.addUserRecord("pk", "272225893536750770770699685945414569162", 
record1.getBytes());
aggregator.addUserRecord("pk", "272225893536750770770699685945414569165", 
record2.getBytes());

AmazonKinesis kinesisClient = AmazonKinesisClient.builder()
   .build();
kinesisClient.putRecord(aggregator.clearAndGet().toPutRecordRequest("EFOStreamTest"));
 {code}
*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
 

*Solution*

This is fixed by reusing the existing timestamp starting position in this 
condition.

  was:
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ens

[jira] [Closed] (FLINK-29513) Update Kafka version to 3.2.3

2022-10-10 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-29513.
--
Fix Version/s: 1.16.0
   Resolution: Fixed

Also fixed in release-1.16 via 90fba507a824eab2075aa42b0a82cc0b69dc44de

> Update Kafka version to 3.2.3
> -
>
> Key: FLINK-29513
> URL: https://issues.apache.org/jira/browse/FLINK-29513
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.17.0
>
>
> Kafka 3.2.3 contains certain security fixes (see 
> https://downloads.apache.org/kafka/3.2.3/RELEASE_NOTES.html). We should 
> upgrade the dependency in Flink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] MartijnVisser merged pull request #20977: [FLINK-29513][BP 1.16][Connector/Kafka] Update Kafka to version 3.2.3

2022-10-10 Thread GitBox


MartijnVisser merged PR #20977:
URL: https://github.com/apache/flink/pull/20977


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ferenc-csaky commented on a diff in pull request #20987: [FLINK-29432][Connector/Hive] Remove use of GenericNVL and use Colaese

2022-10-10 Thread GitBox


ferenc-csaky commented on code in PR #20987:
URL: https://github.com/apache/flink/pull/20987#discussion_r991586643


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java:
##
@@ -1216,7 +1216,7 @@ protected ExprNodeDesc getXpathOrFuncExprNodeDesc(
 // Rewrite CASE into NVL
 desc =
 ExprNodeGenericFuncDesc.newInstance(
-new GenericUDFNvl(),
+new GenericUDFCoalesce(),

Review Comment:
   I think it would be nice to adapt the comment @ line 1216 as well to keep it 
consistent. The change itself LGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29495) PulsarSinkE2ECase hang

2022-10-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615279#comment-17615279
 ] 

Martijn Visser commented on FLINK-29495:


Test disabled in master via be32eddd3a775cc336412b1cca14ecfa522320ec

> PulsarSinkE2ECase hang
> --
>
> Key: FLINK-29495
> URL: https://issues.apache.org/jira/browse/FLINK-29495
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.16.0, 1.17.0, 1.15.2
>Reporter: Xingbo Huang
>Assignee: Yufan Sheng
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> {code:java}
> 2022-10-02T05:53:56.0611489Z "main" #1 prio=5 os_prio=0 cpu=5171.60ms 
> elapsed=9072.82s tid=0x7f9508028000 nid=0x54ef1 waiting on condition  
> [0x7f950f994000]
> 2022-10-02T05:53:56.0612041Zjava.lang.Thread.State: TIMED_WAITING 
> (parking)
> 2022-10-02T05:53:56.0612475Z  at 
> jdk.internal.misc.Unsafe.park(java.base@11.0.16.1/Native Method)
> 2022-10-02T05:53:56.0613302Z  - parking to wait for  <0x87d261f8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 2022-10-02T05:53:56.0613959Z  at 
> java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.16.1/LockSupport.java:234)
> 2022-10-02T05:53:56.0614661Z  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(java.base@11.0.16.1/AbstractQueuedSynchronizer.java:2123)
> 2022-10-02T05:53:56.0615428Z  at 
> org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue.poll(GrowableArrayBlockingQueue.java:203)
> 2022-10-02T05:53:56.0616165Z  at 
> org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.internalReceive(MultiTopicsConsumerImpl.java:370)
> 2022-10-02T05:53:56.0616807Z  at 
> org.apache.pulsar.client.impl.ConsumerBase.receive(ConsumerBase.java:198)
> 2022-10-02T05:53:56.0617486Z  at 
> org.apache.flink.connector.pulsar.testutils.sink.PulsarPartitionDataReader.poll(PulsarPartitionDataReader.java:72)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=41526&view=logs&j=6e8542d7-de38-5a33-4aca-458d6c87066d&t=5846934b-7a4f-545b-e5b0-eb4d8bda32e1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] MartijnVisser merged pull request #21005: [FLINK-29495][Connector/Pulsar] Refactor Pulsar tests to disable them on Java 11

2022-10-10 Thread GitBox


MartijnVisser merged PR #21005:
URL: https://github.com/apache/flink/pull/21005


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29395) [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard

2022-10-10 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-29395:

Description: 
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ensure that the explicitHashKey 
of every record in the aggregated batch is the same.

When resharding occurs, the aggregated record batch can have records that are 
out of the shard's hash key range. This causes the records to be dropped when 
deaggregating, and can result in this situation, where record batch is not 
empty, but the deaggregated record batch is empty.

The symptom seen is similar to the issue seen in 
https://issues.apache.org/jira/browse/FLINK-20088.

See 
[here|https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md]
 and [here|https://github.com/awslabs/kinesis-aggregation/issues/11] for a more 
detailed explanation

 

*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
 

*Solution*

This is fixed by reusing the existing timestamp starting position in this 
condition.

  was:
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ensure that the explicitHashKey 
of every record in the aggregated batch is the same.

When resharding occurs, the aggregated record batch can have records that are 
out of the shard's hash key range. This causes the records to be dropped when 
deaggregating, and can result in this situation, where record batch is not 
empty, but the deaggregated record batch is empty.

The symptom seen is similar to the issue seen in 
https://issues.apache.org/jira/browse/FLINK-20088.

See 
[here|[http://example.com|https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md]]
 and [here|https://github.com/awslabs/kinesis-aggregation/issues/11] for a more 
detailed explanation

 

*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)

at 
org.apache.flink.streaming.conn

[jira] [Updated] (FLINK-29395) [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard

2022-10-10 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-29395:

Description: 
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ensure that the explicitHashKey 
of every record in the aggregated batch is the same.

When resharding occurs, the aggregated record batch can have records that are 
out of the shard's hash key range. This causes the records to be dropped when 
deaggregating, and can result in this situation, where record batch is not 
empty, but the deaggregated record batch is empty.

The symptom seen is similar to the issue seen in 
https://issues.apache.org/jira/browse/FLINK-20088.

See 
[here|[http://example.com|https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md]]
 and [here|https://github.com/awslabs/kinesis-aggregation/issues/11] for a more 
detailed explanation

 

*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
 

*Solution*

This is fixed by reusing the existing timestamp starting position in this 
condition.

  was:
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ensure that the explicitHashKey 
of every record in the aggregated batch is the same.

When resharding occurs, the aggregated record batch can have records that are 
out of the shard's hash key range. This causes the records to be dropped when 
deaggregating, and can result in this situation, where record batch is not 
empty, but the deaggregated record batch is empty.

The symptom seen is similar to the issue seen in 
https://issues.apache.org/jira/browse/FLINK-20088.

See here for a more detailed explanation 
[https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md.]

 

*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subsc

[jira] [Updated] (FLINK-29395) [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard

2022-10-10 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-29395:

Description: 
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ensure that the explicitHashKey 
of every record in the aggregated batch is the same.

When resharding occurs, the aggregated record batch can have records that are 
out of the shard's hash key range. This causes the records to be dropped when 
deaggregating, and can result in this situation, where record batch is not 
empty, but the deaggregated record batch is empty.

The symptom seen is similar to the issue seen in 
https://issues.apache.org/jira/browse/FLINK-20088.

See here for a more detailed explanation 
[https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md.]

 

*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
 

*Solution*

This is fixed by reusing the existing timestamp starting position in this 
condition.

  was:
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ensure that the explicitHashKey 
of every record in the aggregated batch is the same.

When resharding occurs, the aggregated record batch can have records that are 
out of the shard's hash key range. This causes the records to be dropped when 
deaggregating, and can result in this situation, where record batch is not 
empty, but the deaggregated record batch is empty.

The symptom seen is similar to the issue seen in 
https://issues.apache.org/jira/browse/FLINK-20088. 

*Reproduction Steps*

Setup an application consuming from Kinesis with following properties and 
consume from an empty shard:
{code:java}
String format = "-MM-dd'T'HH:mm:ss";
String date = new SimpleDateFormat(format).format(new Date());

consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
date);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
 format);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP"); 
consumerConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
"EFO"); {code}
*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)

at 
org.apache.flink.streaming.connectors.kines

[jira] [Updated] (FLINK-29395) [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard

2022-10-10 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-29395:

Description: 
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position, the first record batch is not empty, but the first 
deaggregated record batch is empty. This can happen if the user explicitly 
specifies the hashkey in the KPL, and does not ensure that the explicitHashKey 
of every record in the aggregated batch is the same.

When resharding occurs, the aggregated record batch can have records that are 
out of the shard's hash key range. This causes the records to be dropped when 
deaggregating, and can result in this situation, where record batch is not 
empty, but the deaggregated record batch is empty.

The symptom seen is similar to the issue seen in 
https://issues.apache.org/jira/browse/FLINK-20088. 

*Reproduction Steps*

Setup an application consuming from Kinesis with following properties and 
consume from an empty shard:
{code:java}
String format = "-MM-dd'T'HH:mm:ss";
String date = new SimpleDateFormat(format).format(new Date());

consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
date);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
 format);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP"); 
consumerConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
"EFO"); {code}
*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
 

*Solution*

This is fixed by reusing the existing timestamp starting position in this 
condition.

  was:
*Background*

The consumer fails when an EFO record publisher uses a timestamp sentinel 
starting position and the first record batch is empty. This is because the 
consumer tries to recalculate the start position from the timestamp sentinel, 
this operation is not supported.

This is the same issue as https://issues.apache.org/jira/browse/FLINK-20088

*Reproduction Steps*

Setup an application consuming from Kinesis with following properties and 
consume from an empty shard:
{code:java}
String format = "-MM-dd'T'HH:mm:ss";
String date = new SimpleDateFormat(format).format(new Date());

consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
date);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
 format);
consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP"); 
consumerConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
"EFO"); {code}
*Error*
{code:java}
java.lang.IllegalArgumentException: Unexpected sentinel type: 
AT_TIMESTAMP_SEQUENCE_NUM
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosi

[GitHub] [flink] mxm commented on a diff in pull request #20953: [FLINK-29501] Add option to override job vertex parallelisms during job submission

2022-10-10 Thread GitBox


mxm commented on code in PR #20953:
URL: https://github.com/apache/flink/pull/20953#discussion_r991532503


##
flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java:
##
@@ -64,22 +64,37 @@
  *
  * 
  *   typeClass == atomic class (e.g. {@code Integer.class}) -> {@code 
ConfigOption}
- *   typeClass == {@code Map.class} -> {@code ConfigOption>}
- *   typeClass == atomic class and isList == true for {@code 
ConfigOption>}
+ *   typeClass == atomic class and type == LIST for {@code 
ConfigOption>}

Review Comment:
   This has been moved to https://github.com/apache/flink/pull/21008 alongside 
with more tests. The list type is not sufficient for this kind of 
configuration. It's not a logical limitation to support many value types but 
not allow them in maps as values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] leletan commented on pull request #20852: [FLINK-27101][checkpointing][rest] Add restful API to trigger checkpoints

2022-10-10 Thread GitBox


leletan commented on PR #20852:
URL: https://github.com/apache/flink/pull/20852#issuecomment-1273650082

   Hi folks, @pnowojski @stevenzwu @zentol Thanks for your review again. I 
believe I addressed all comments, except the pending naming of the old 
CheckpointType class name (will add in a later commit to make the code easier)
   One big change is, I added one more CheckpointType as `CONFIGURED`, to 
represent using the existing internal CheckpointSnapshotType as configured by 
flink application to triggerCheckpoint, replacing previous legacy minicluster 
case represented by `null`. 
   
   Please let me know if there is anything else I am missing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29573) Flink does not work for windows if installing on other drive except for C drive

2022-10-10 Thread kris wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kris wang updated FLINK-29573:
--
Description: 
{{ flink_service.bat }}-{-}> \{{flink_inner.bat }}{-}{-}> \{{start-cluster.sh 
}}{-}->\{{{}config.sh}}
The {{config.sh}} which is under the _*D:\x\flink-1.14.4\bin*_ (Flink’s 
directory)
has _**_ {{runBashJavaUtilsCmd()}} --> the current derived value of 
{{class_path}} after mangling the Path is
*D;C:\x\flink-1.14.4\bin\bash-java-utils.jar;D:\PlatformAnalytics\flink-1.14.4\lib\flink-dist_2.12-1.14.4.jar*

Here, {{runBashJavaUtilsCmd()}} internally sets {{class_path}} by calling a 
function called {{manglePathList}} 
{{manglePathList}} is causing this behavior where
_:\x\flink-1.13.5/bin/bash-java-utils.jar_ is getting converted to 
_C:\x\flink-1.14.4\bin\bash-java-utils.jar_ 
 

  was:
{{ flink_service.bat }}--> {{flink_inner.bat }}--> {{start-cluster.sh 
}}-->{{{}config.sh{}}}
The {{config.sh}} which is under the _*D:\x\flink-1.14.4\bin*_ (Flink’s 
directory)
has _**_ {{runBashJavaUtilsCmd()}} --> the current derived value of 
{{class_path}} after mangling the Path is
*D;C:\x\flink-1.14.4\bin\bash-java-utils.jar;D:\PlatformAnalytics\flink-1.14.4\lib\flink-dist_2.12-1.14.4.jar*

Here, {{runBashJavaUtilsCmd()}} internally sets {{class_path}} by calling a 
function called {{manglePathList}} 
{{manglePathList}} is causing this behavior where
_:\x\flink-1.13.5/bin/bash-java-utils.jar_ is getting converted to 
_C:\PlatformAnalytics\flink-1.14.4\bin\bash-java-utils.jar_ 
 


> Flink does not work for windows if installing on other drive except for C 
> drive
> ---
>
> Key: FLINK-29573
> URL: https://issues.apache.org/jira/browse/FLINK-29573
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.14.4
>Reporter: kris wang
>Priority: Major
>
> {{ flink_service.bat }}-{-}> \{{flink_inner.bat }}{-}{-}> \{{start-cluster.sh 
> }}{-}->\{{{}config.sh}}
> The {{config.sh}} which is under the _*D:\x\flink-1.14.4\bin*_ (Flink’s 
> directory)
> has _**_ {{runBashJavaUtilsCmd()}} --> the current derived value of 
> {{class_path}} after mangling the Path is
> *D;C:\x\flink-1.14.4\bin\bash-java-utils.jar;D:\PlatformAnalytics\flink-1.14.4\lib\flink-dist_2.12-1.14.4.jar*
> Here, {{runBashJavaUtilsCmd()}} internally sets {{class_path}} by calling a 
> function called {{manglePathList}} 
> {{manglePathList}} is causing this behavior where
> _:\x\flink-1.13.5/bin/bash-java-utils.jar_ is getting converted to 
> _C:\x\flink-1.14.4\bin\bash-java-utils.jar_ 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29573) Flink does not work for windows if installing on other drive except for C drive

2022-10-10 Thread kris wang (Jira)
kris wang created FLINK-29573:
-

 Summary: Flink does not work for windows if installing on other 
drive except for C drive
 Key: FLINK-29573
 URL: https://issues.apache.org/jira/browse/FLINK-29573
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Affects Versions: 1.14.4
Reporter: kris wang


{{ flink_service.bat }}--> {{flink_inner.bat }}--> {{start-cluster.sh 
}}-->{{{}config.sh{}}}
The {{config.sh}} which is under the _*D:\x\flink-1.14.4\bin*_ (Flink’s 
directory)
has _**_ {{runBashJavaUtilsCmd()}} --> the current derived value of 
{{class_path}} after mangling the Path is
*D;C:\x\flink-1.14.4\bin\bash-java-utils.jar;D:\PlatformAnalytics\flink-1.14.4\lib\flink-dist_2.12-1.14.4.jar*

Here, {{runBashJavaUtilsCmd()}} internally sets {{class_path}} by calling a 
function called {{manglePathList}} 
{{manglePathList}} is causing this behavior where
_:\x\flink-1.13.5/bin/bash-java-utils.jar_ is getting converted to 
_C:\PlatformAnalytics\flink-1.14.4\bin\bash-java-utils.jar_ 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29572) Flink Task Manager skip loopback interface for resource manager registration

2022-10-10 Thread Kevin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Li updated FLINK-29572:
-
Description: 
Currently Flink Task Manager use different local interface to bind to connect 
to Resource Manager. First one is Loopback interface. Normally if Job Manager 
is running on remote host/container, using loopback interface to connect will 
fail and it will pick up correct IP address.

However, if Task Manager is running with some proxy, loopback interface can 
connect to remote host as well. This will result 127.0.0.1 reported to Resource 
Manager during registration, even Job Manager/Resource Manager runs on remote 
host, and problem will happen. For us, only one Task Manager can register in 
this case.

I suggest adding configuration to skip Loopback interface check if we know 
Job/Resource Manager is running on remote host/container.

 

  was:
Currently Flink Task Manager use different local interface to bind to connect 
to Resource Manager. First one is Loopback interface. Normally if Job Manager 
is running on remote host/container, using loopback interface to connect will 
fail and it will pick up correct IP address.

 

However, if Task Manager is running with some proxy, loopback interface can 
connect to remote host as well. This will result 127.0.0.1 reported to Resource 
Manager during registration, even Job Manager/Resource Manager runs on remote 
host, and problem will happen. For us, only one Task Manager can register in 
this case.

 

 

 

I suggest adding configuration to skip Loopback interface check if we know 
Job/Resource Manager is running on remote host/container.

 


> Flink Task Manager skip loopback interface for resource manager registration
> 
>
> Key: FLINK-29572
> URL: https://issues.apache.org/jira/browse/FLINK-29572
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.15.2
> Environment: Flink 1.15.2
> Kubernetes with Istio Proxy
>Reporter: Kevin Li
>Priority: Major
>
> Currently Flink Task Manager use different local interface to bind to connect 
> to Resource Manager. First one is Loopback interface. Normally if Job Manager 
> is running on remote host/container, using loopback interface to connect will 
> fail and it will pick up correct IP address.
> However, if Task Manager is running with some proxy, loopback interface can 
> connect to remote host as well. This will result 127.0.0.1 reported to 
> Resource Manager during registration, even Job Manager/Resource Manager runs 
> on remote host, and problem will happen. For us, only one Task Manager can 
> register in this case.
> I suggest adding configuration to skip Loopback interface check if we know 
> Job/Resource Manager is running on remote host/container.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29572) Flink Task Manager skip loopback interface for resource manager registration

2022-10-10 Thread Kevin Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615214#comment-17615214
 ] 

Kevin Li commented on FLINK-29572:
--

{quote}Task Manager Log:
2022-10-08 17:22:32,983 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils   [] - Trying to 
select the network interface and address to use by connecting to the leading 
JobManager.
2022-10-08 17:22:32,984 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils   [] - TaskManager 
will try to connect for PT10S before falling back to heuristics
2022-10-08 17:22:33,356 DEBUG org.apache.flink.runtime.net.ConnectionUtils  
   [] - Retrieved new target address 
flink-jobmanager/172.20.133.241:6123 for akka URL 
[akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*]
.
2022-10-08 17:22:33,357 DEBUG org.apache.flink.runtime.net.ConnectionUtils  
   [] - Trying to connect to [flink-jobmanager/172.20.133.241:6123] 
from local address [localhost/127.0.0.1] with timeout [100]
2022-10-08 17:22:33,361 DEBUG org.apache.flink.runtime.net.ConnectionUtils  
   [] - Using InetAddress.getLoopbackAddress() immediately for 
connecting address
2022-10-08 17:22:33,361 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - TaskManager 
will use hostname/address 'localhost' (127.0.0.1) for communication.
2022-10-08 17:22:33,416 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Trying to 
start actor system, external address 127.0.0.1:6122, bind address 
0.0.0.0:6122.{quote}

> Flink Task Manager skip loopback interface for resource manager registration
> 
>
> Key: FLINK-29572
> URL: https://issues.apache.org/jira/browse/FLINK-29572
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.15.2
> Environment: Flink 1.15.2
> Kubernetes with Istio Proxy
>Reporter: Kevin Li
>Priority: Major
>
> Currently Flink Task Manager use different local interface to bind to connect 
> to Resource Manager. First one is Loopback interface. Normally if Job Manager 
> is running on remote host/container, using loopback interface to connect will 
> fail and it will pick up correct IP address.
>  
> However, if Task Manager is running with some proxy, loopback interface can 
> connect to remote host as well. This will result 127.0.0.1 reported to 
> Resource Manager during registration, even Job Manager/Resource Manager runs 
> on remote host, and problem will happen. For us, only one Task Manager can 
> register in this case.
>  
>  
>  
> I suggest adding configuration to skip Loopback interface check if we know 
> Job/Resource Manager is running on remote host/container.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29572) Flink Task Manager skip loopback interface for resource manager registration

2022-10-10 Thread Kevin Li (Jira)
Kevin Li created FLINK-29572:


 Summary: Flink Task Manager skip loopback interface for resource 
manager registration
 Key: FLINK-29572
 URL: https://issues.apache.org/jira/browse/FLINK-29572
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Affects Versions: 1.15.2
 Environment: Flink 1.15.2

Kubernetes with Istio Proxy
Reporter: Kevin Li


Currently Flink Task Manager use different local interface to bind to connect 
to Resource Manager. First one is Loopback interface. Normally if Job Manager 
is running on remote host/container, using loopback interface to connect will 
fail and it will pick up correct IP address.

 

However, if Task Manager is running with some proxy, loopback interface can 
connect to remote host as well. This will result 127.0.0.1 reported to Resource 
Manager during registration, even Job Manager/Resource Manager runs on remote 
host, and problem will happen. For us, only one Task Manager can register in 
this case.

 

 

 

I suggest adding configuration to skip Loopback interface check if we know 
Job/Resource Manager is running on remote host/container.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29561) Log the job id when clean up session job failed

2022-10-10 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi closed FLINK-29561.
--
Resolution: Fixed

[{{8e57e5b}}|https://github.com/apache/flink-kubernetes-operator/commit/8e57e5b75af385c22dccaef85014175af35b47e8]
 in main

> Log the job id when clean up session job failed
> ---
>
> Key: FLINK-29561
> URL: https://issues.apache.org/jira/browse/FLINK-29561
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Xin Hao
>Assignee: Xin Hao
>Priority: Minor
>  Labels: pull-request-available
>
> If so, we can delete it by Flink rest API manually, and no need to query it 
> by name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] mbalassi merged pull request #398: [FLINK-29561] Add job id in the cleanup job failed message.

2022-10-10 Thread GitBox


mbalassi merged PR #398:
URL: https://github.com/apache/flink-kubernetes-operator/pull/398


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mbalassi commented on pull request #398: [FLINK-29561] Add job id in the cleanup job failed message.

2022-10-10 Thread GitBox


mbalassi commented on PR #398:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/398#issuecomment-1273572910

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29561) Log the job id when clean up session job failed

2022-10-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29561:
---
Labels: pull-request-available  (was: )

> Log the job id when clean up session job failed
> ---
>
> Key: FLINK-29561
> URL: https://issues.apache.org/jira/browse/FLINK-29561
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Xin Hao
>Assignee: Xin Hao
>Priority: Minor
>  Labels: pull-request-available
>
> If so, we can delete it by Flink rest API manually, and no need to query it 
> by name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29561) Log the job id when clean up session job failed

2022-10-10 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi reassigned FLINK-29561:
--

Assignee: Xin Hao

> Log the job id when clean up session job failed
> ---
>
> Key: FLINK-29561
> URL: https://issues.apache.org/jira/browse/FLINK-29561
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Xin Hao
>Assignee: Xin Hao
>Priority: Minor
>
> If so, we can delete it by Flink rest API manually, and no need to query it 
> by name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29503) Add backpressureLevel field without hyphens

2022-10-10 Thread Tiger (Apache) Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615198#comment-17615198
 ] 

Tiger (Apache) Wang edited comment on FLINK-29503 at 10/10/22 4:25 PM:
---

Verified the issue no longer repros, with yaml file from 
https://github.com/apache/flink/pull/20962/files.

 

I could build a JS package out of generated TypeScript code successfully.

 

Thanks.


was (Author: JIRAUSER279801):
Verified the issue no longer repros.

 

I could build a JS package out of generated TypeScript code successfully.

 

Thanks.

> Add backpressureLevel field without hyphens
> ---
>
> Key: FLINK-29503
> URL: https://issues.apache.org/jira/browse/FLINK-29503
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.15.2
>Reporter: Tiger (Apache) Wang
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: openapi
> Fix For: 1.16.0, 1.15.3
>
>
> Install nodejs and run
> {{$ npx --yes --package openapi-typescript-codegen openapi --input 
> [https://nightlies.apache.org/flink/flink-docs-release-1.15/generated/rest_v1_dispatcher.yml]
>  --output .}}
> {{$ npx --package typescript tsc }}
> The only thing it complains about is:
> {{{}src/models/JobVertexBackPressureInfo.ts:21:17 - error TS1003: Identifier 
> expected.{}}}{{{}21     export enum 'backpressure-level' {{}}}
> This is because for TypeScript, enum name should not have a hyphen in it.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   >