[jira] [Commented] (FLINK-29739) [FLIP-265] Deprecate and remove Scala API support

2024-09-24 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884214#comment-17884214
 ] 

Jark Wu commented on FLINK-29739:
-

+1 for the approach proposed by [~xuyangzhong]. Besides, I think we still need 
to create sub-tasks to remove the Scala Table API usage in tests (they should 
be replaced by Java Table APIs), so that it can make it easier to remove Scala 
Table API if needed in the future. 

> [FLIP-265] Deprecate and remove Scala API support
> -
>
> Key: FLINK-29739
> URL: https://issues.apache.org/jira/browse/FLINK-29739
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Scala
>Reporter: Martijn Visser
>Priority: Major
>  Labels: 2.0-related
>
> FLIP: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36004) Flink SQL returns wrong results for Paimon tables with complex schemas

2024-08-09 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872257#comment-17872257
 ] 

Jark Wu commented on FLINK-36004:
-

[~xuyangzhong] could you help have a look at this issue?

> Flink SQL returns wrong results for Paimon tables with complex schemas
> --
>
> Key: FLINK-36004
> URL: https://issues.apache.org/jira/browse/FLINK-36004
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.1, 1.19.1
>Reporter: Xingcan Cui
>Priority: Blocker
> Attachments: screenshot-1.png
>
>
> We have a Paimon table with some nested files such as the following one.
> {code:java}
> `f1` ROW <
> `f2` ROW <
> `f3` ARRAY < ROW < `key` INT, `value` FLOAT > NOT NULL >,
> `f4` ARRAY < ROW < `key` INT, `value` STRING > NOT NULL >,
> `f5` ARRAY < ROW < `key` BIGINT, `value` FLOAT > NOT NULL >,
> `f6` ARRAY < ROW < `key` BIGINT, `value` BIGINT > NOT NULL >,
> `f7` ARRAY < ROW < `key` BIGINT, `value` ROW < `f71` ARRAY < 
> FLOAT > > > NOT NULL >,
> `f8` ARRAY < ROW < `f81` STRING, `f82` STRING, `f83` BIGINT > 
> NOT NULL >,
> `f9` ARRAY < ROW < `key` BIGINT, `value` ROW < `f91` ARRAY < 
> BIGINT > > > NOT NULL >,
> `f10` ROW < `f101` ARRAY < ROW < `f102` BIGINT, `f103` 
> STRING, `f104` BIGINT > NOT NULL > >,
> `f11` ARRAY < ROW < `key` BIGINT, `value` STRING > NOT NULL >
> >
> {code}
> When a select query includes some nested columns, the results will be wrong.
> For example, {{SELECT CARDINALITY(f1.f2.f3) AS r FROM...WHERE...}} can return 
> correct results but {{SELECT CARDINALITY(f1.f2.f3) AS r, f1 FROM...WHERE...}} 
> will return wrong values for {{r.}}
> The query execution won't throw any exception but fails silently.
> I'm not sure if this is a Paimon-specific issue, but I also tested running 
> the same query with Spark and StarRocks, and both of them can produce correct 
> results.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35877) Shade protobuf in flink

2024-08-06 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-35877.
---
Resolution: Fixed

This is fixed on master: 68d8174c996f05b2fbf9c50d0220a72f29e3a56b

> Shade protobuf in flink
> ---
>
> Key: FLINK-35877
> URL: https://issues.apache.org/jira/browse/FLINK-35877
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.19.2
>Reporter: zhuanshenbsj1
>Assignee: zhuanshenbsj1
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Shade the classes in protobuf to avoid class conflict.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35877) Shade protobuf in flink

2024-08-02 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-35877:
---

Assignee: zhuanshenbsj1

> Shade protobuf in flink
> ---
>
> Key: FLINK-35877
> URL: https://issues.apache.org/jira/browse/FLINK-35877
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.19.2
>Reporter: zhuanshenbsj1
>Assignee: zhuanshenbsj1
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Shade the classes in protobuf to avoid class conflict.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35877) Shade protobuf in flink

2024-08-02 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-35877:

Fix Version/s: 2.0.0
   (was: 1.19.2)

> Shade protobuf in flink
> ---
>
> Key: FLINK-35877
> URL: https://issues.apache.org/jira/browse/FLINK-35877
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.19.2
>Reporter: zhuanshenbsj1
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Shade the classes in protobuf to avoid class conflict.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35877) Shade protobuf in flink

2024-08-02 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-35877:

Component/s: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
 (was: BuildSystem / Shaded)

> Shade protobuf in flink
> ---
>
> Key: FLINK-35877
> URL: https://issues.apache.org/jira/browse/FLINK-35877
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.19.2
>Reporter: zhuanshenbsj1
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.2
>
>
> Shade the classes in protobuf to avoid class conflict.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35842) Change the FlinkCDC dependency to 3.1.x

2024-07-15 Thread Di Wu (Jira)
Di Wu created FLINK-35842:
-

 Summary: Change the FlinkCDC dependency to 3.1.x
 Key: FLINK-35842
 URL: https://issues.apache.org/jira/browse/FLINK-35842
 Project: Flink
  Issue Type: Sub-task
Reporter: Di Wu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35841) Change package name to org.apache.flink

2024-07-15 Thread Di Wu (Jira)
Di Wu created FLINK-35841:
-

 Summary: Change package name to org.apache.flink
 Key: FLINK-35841
 URL: https://issues.apache.org/jira/browse/FLINK-35841
 Project: Flink
  Issue Type: Sub-task
Reporter: Di Wu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35840) Add documentation for Doris

2024-07-15 Thread Di Wu (Jira)
Di Wu created FLINK-35840:
-

 Summary: Add documentation for Doris
 Key: FLINK-35840
 URL: https://issues.apache.org/jira/browse/FLINK-35840
 Project: Flink
  Issue Type: Sub-task
Reporter: Di Wu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35839) Migrate repo from apache/doris to apache/flink

2024-07-15 Thread Di Wu (Jira)
Di Wu created FLINK-35839:
-

 Summary: Migrate repo from apache/doris to apache/flink
 Key: FLINK-35839
 URL: https://issues.apache.org/jira/browse/FLINK-35839
 Project: Flink
  Issue Type: Sub-task
Reporter: Di Wu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35838) FLIP-399: Flink Connector Doris

2024-07-15 Thread Di Wu (Jira)
Di Wu created FLINK-35838:
-

 Summary: FLIP-399: Flink Connector Doris
 Key: FLINK-35838
 URL: https://issues.apache.org/jira/browse/FLINK-35838
 Project: Flink
  Issue Type: New Feature
Reporter: Di Wu


As discussed in  Flink dev  mailing list[1][2], we should finish the repo and 
doc migration as soon as possible.

https://lists.apache.org/thread/w3hoglk0pqbzqhzlfcgzkkz3xrwo90rt
https://lists.apache.org/thread/b32qvhzpmq06z2x5s9c8qr3pzsnld34m



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35753) ParquetColumnarRowInputFormatTest.testContinuousRepetition(int) failed due to ArrayIndexOutOfBoundsException

2024-07-09 Thread Wenchao Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864228#comment-17864228
 ] 

Wenchao Wu commented on FLINK-35753:


Hello [~Weijie Guo],  I pull the newest master branch and run the command as 
the ci do it ran successfully, i have no idea what's wrong with that now😂

> ParquetColumnarRowInputFormatTest.testContinuousRepetition(int) failed due to 
> ArrayIndexOutOfBoundsException
> 
>
> Key: FLINK-35753
> URL: https://issues.apache.org/jira/browse/FLINK-35753
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Connectors / Parent
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>
> {code:java}
> Jul 02 01:23:50 01:23:50.105 [ERROR] 
> org.apache.flink.formats.parquet.ParquetColumnarRowInputFormatTest.testContinuousRepetition(int)[2]
>  -- Time elapsed: 1.886 s <<< ERROR!
> Jul 02 01:23:50 java.lang.ArrayIndexOutOfBoundsException: 500
> Jul 02 01:23:50   at 
> org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector.setNullAt(AbstractHeapVector.java:72)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.setFieldNullFalg(NestedColumnReader.java:251)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.readArray(NestedColumnReader.java:221)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.readData(NestedColumnReader.java:101)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.readToVector(NestedColumnReader.java:90)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:413)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:381)
> Jul 02 01:23:50   at 
> org.apache.flink.connector.file.src.util.Utils.forEachRemaining(Utils.java:81)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60587&view=logs&j=1c002d28-a73d-5309-26ee-10036d8476b4&t=d1c117a6-8f13-5466-55f0-d48dbb767fcd&l=12002



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35753) ParquetColumnarRowInputFormatTest.testContinuousRepetition(int) failed due to ArrayIndexOutOfBoundsException

2024-07-09 Thread Wenchao Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864048#comment-17864048
 ] 

Wenchao Wu commented on FLINK-35753:


OK, I will check it

> ParquetColumnarRowInputFormatTest.testContinuousRepetition(int) failed due to 
> ArrayIndexOutOfBoundsException
> 
>
> Key: FLINK-35753
> URL: https://issues.apache.org/jira/browse/FLINK-35753
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Connectors / Parent
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>
> {code:java}
> Jul 02 01:23:50 01:23:50.105 [ERROR] 
> org.apache.flink.formats.parquet.ParquetColumnarRowInputFormatTest.testContinuousRepetition(int)[2]
>  -- Time elapsed: 1.886 s <<< ERROR!
> Jul 02 01:23:50 java.lang.ArrayIndexOutOfBoundsException: 500
> Jul 02 01:23:50   at 
> org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector.setNullAt(AbstractHeapVector.java:72)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.setFieldNullFalg(NestedColumnReader.java:251)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.readArray(NestedColumnReader.java:221)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.readData(NestedColumnReader.java:101)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.readToVector(NestedColumnReader.java:90)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:413)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:381)
> Jul 02 01:23:50   at 
> org.apache.flink.connector.file.src.util.Utils.forEachRemaining(Utils.java:81)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60587&view=logs&j=1c002d28-a73d-5309-26ee-10036d8476b4&t=d1c117a6-8f13-5466-55f0-d48dbb767fcd&l=12002



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35753) ParquetColumnarRowInputFormatTest.testContinuousRepetition(int) failed due to ArrayIndexOutOfBoundsException

2024-07-08 Thread Wenchao Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863754#comment-17863754
 ] 

Wenchao Wu commented on FLINK-35753:


Run test alone and install in my local, can not reproduce the error.

> ParquetColumnarRowInputFormatTest.testContinuousRepetition(int) failed due to 
> ArrayIndexOutOfBoundsException
> 
>
> Key: FLINK-35753
> URL: https://issues.apache.org/jira/browse/FLINK-35753
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>
> {code:java}
> Jul 02 01:23:50 01:23:50.105 [ERROR] 
> org.apache.flink.formats.parquet.ParquetColumnarRowInputFormatTest.testContinuousRepetition(int)[2]
>  -- Time elapsed: 1.886 s <<< ERROR!
> Jul 02 01:23:50 java.lang.ArrayIndexOutOfBoundsException: 500
> Jul 02 01:23:50   at 
> org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector.setNullAt(AbstractHeapVector.java:72)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.setFieldNullFalg(NestedColumnReader.java:251)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.readArray(NestedColumnReader.java:221)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.readData(NestedColumnReader.java:101)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.vector.reader.NestedColumnReader.readToVector(NestedColumnReader.java:90)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:413)
> Jul 02 01:23:50   at 
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:381)
> Jul 02 01:23:50   at 
> org.apache.flink.connector.file.src.util.Utils.forEachRemaining(Utils.java:81)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60587&view=logs&j=1c002d28-a73d-5309-26ee-10036d8476b4&t=d1c117a6-8f13-5466-55f0-d48dbb767fcd&l=12002



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35702) Support Parquet Nested Read

2024-06-25 Thread Wenchao Wu (Jira)
Wenchao Wu created FLINK-35702:
--

 Summary: Support Parquet Nested Read
 Key: FLINK-35702
 URL: https://issues.apache.org/jira/browse/FLINK-35702
 Project: Flink
  Issue Type: New Feature
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Wenchao Wu


Now flink parquet doesn’t support read nested columns such as Array, 
Array,Array in vectorized.  This feature is aimed to solve this 
problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35600) Data read duplication during the full-to-incremental conversion phase

2024-06-13 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-35600:
--
Description: 
Assume that the table has been split into 3 Chunks

Timeline
t1: chunk1 is read
t2: a piece of data A belonging to chunk2 is inserted in MySQL
t3: chunk2 is read, and data A has been sent downstream
t4: chunk3 is read

At this time, startOffset will be set to lowwatermark
t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method 
*shouldEmit* to determine whether the data is sent downstream

In this method
{code:java}
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) 
{
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
// the existed tables those have finished snapshot reading
if (maxSplitHighWatermarkMap.containsKey(tableId)
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
pureBinlogPhaseTables.add(tableId);
return true;
}
} {code}
*maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without 
ts_sec variable, and the default value is 0
*position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
So this expression is judged as true

*Data A continues to be sent downstream, and the data is repeated*

  was:
Assume that the table has been split into 3 Chunks

Timeline
t1: chunk1 is read
t2: a piece of data A belonging to chunk2 is inserted in MySQL
t3: chunk2 is read, and data A has been sent downstream
t4: chunk3 is read

At this time, startOffset will be set to lowwatermark
t5: BinlogSplitReader.pollSplitRecords receives data A, and uses the method 
shouldEmit to determine whether the data is sent downstream

In this method
{code:java}
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) 
{
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
// the existed tables those have finished snapshot reading
if (maxSplitHighWatermarkMap.containsKey(tableId)
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
pureBinlogPhaseTables.add(tableId);
return true;
}
} {code}

*maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without 
ts_sec variable, and the default value is 0
*position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
So this expression is judged as true

*Data A continues to be sent downstream, and the data is repeated*


> Data read duplication during the full-to-incremental conversion phase
> -
>
> Key: FLINK-35600
> URL: https://issues.apache.org/jira/browse/FLINK-35600
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Di Wu
>Priority: Major
>  Labels: pull-request-available
>
> Assume that the table has been split into 3 Chunks
> Timeline
> t1: chunk1 is read
> t2: a piece of data A belonging to chunk2 is inserted in MySQL
> t3: chunk2 is read, and data A has been sent downstream
> t4: chunk3 is read
> At this time, startOffset will be set to lowwatermark
> t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method 
> *shouldEmit* to determine whether the data is sent downstream
> In this method
> {code:java}
> private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset 
> position) {
> if (pureBinlogPhaseTables.contains(tableId)) {
> return true;
> }
> // the existed tables those have finished snapshot reading
> if (maxSplitHighWatermarkMap.containsKey(tableId)
> && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
> pureBinlogPhaseTables.add(tableId);
> return true;
> }
> } {code}
> *maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data 
> without ts_sec variable, and the default value is 0
> *position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
> So this expression is judged as true
> *Data A continues to be sent downstream, and the data is repeated*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35600) Data read duplication during the full-to-incremental conversion phase

2024-06-13 Thread Di Wu (Jira)
Di Wu created FLINK-35600:
-

 Summary: Data read duplication during the full-to-incremental 
conversion phase
 Key: FLINK-35600
 URL: https://issues.apache.org/jira/browse/FLINK-35600
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Di Wu


Assume that the table has been split into 3 Chunks

Timeline
t1: chunk1 is read
t2: a piece of data A belonging to chunk2 is inserted in MySQL
t3: chunk2 is read, and data A has been sent downstream
t4: chunk3 is read

At this time, startOffset will be set to lowwatermark
t5: BinlogSplitReader.pollSplitRecords receives data A, and uses the method 
shouldEmit to determine whether the data is sent downstream

In this method
{code:java}
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) 
{
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
// the existed tables those have finished snapshot reading
if (maxSplitHighWatermarkMap.containsKey(tableId)
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
pureBinlogPhaseTables.add(tableId);
return true;
}
} {code}

*maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without 
ts_sec variable, and the default value is 0
*position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
So this expression is judged as true

*Data A continues to be sent downstream, and the data is repeated*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35384) Expose TaskIOMetricGroup to custom Partitioner via init Context

2024-05-16 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-35384:
---
Summary: Expose TaskIOMetricGroup to custom Partitioner via init Context  
(was: Expose TaskIOMetricGroup to custom Partitioner)

> Expose TaskIOMetricGroup to custom Partitioner via init Context
> ---
>
> Key: FLINK-35384
> URL: https://issues.apache.org/jira/browse/FLINK-35384
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.4
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg 
> sink. Want to publish some counter metrics for certain scenarios. This is 
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from 
> `DataStream`. 
> {code}
> public  DataStream partitionCustom(
> Partitioner partitioner, KeySelector keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
> pubic `Partitioner` interface, where we can implement the custom range 
> partitioner.
> `Partitioner` interface is a functional interface today. we can add a new 
> default `setup` method without breaking the backward compatibility.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> *default void setup(TaskIOMetricGroup metrics) {}*
> int partition(K key, int numPartitions);
> }
> {code}
> I know public interface requires a FLIP process. will do that if the 
> community agree with this feature request.
> Personally, `numPartitions` should be passed in the `setup` method too. But 
> it is a breaking change that is NOT worth the benefit right now.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
> int partition(K key);
> }
> {code}
> That would be similar to `StreamPartitioner#setup()` method that we would 
> need to modify for passing the metrics group.
> {code}
> @Internal
> public abstract class StreamPartitioner
> implements ChannelSelector>>, 
> Serializable {
> @Override
> public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
> this.numberOfChannels = numberOfChannels;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35384) Expose TaskIOMetricGroup to custom Partitioner

2024-05-16 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847116#comment-17847116
 ] 

Steven Zhen Wu edited comment on FLINK-35384 at 5/17/24 12:06 AM:
--

one potential risk of this type of API is that it is not extensible. if we want 
to pass in another arg to partitioner, we need to break the compatibility or 
add a new method.
{code}
default void setup(TaskIOMetricGroup metrics) {}
{code}

Maybe we can move to the context model that is widely used in Flink
{code}
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
int partition(K key, int numPartitions);

void init(Context context) {}

interface Context {
int numberOfChannels();
TaskIOMetricGroup metrics();
}
}
{code}




was (Author: stevenz3wu):
one potential risk of this type of API is that it is not extensible. if we want 
to pass in another arg to partitioner, we need to break the compatibility or 
add a new method.
{code}
default void setup(TaskIOMetricGroup metrics) {}
{code}

Maybe we can move to the context model that is widely used in Flink
{code}
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
int partition(K key, int numPartitions);

void init(Context context) {}

interface Context {
int numberOfChannels();
   TaskIOMetricGroup metrics();
}
}
{code}



> Expose TaskIOMetricGroup to custom Partitioner
> --
>
> Key: FLINK-35384
> URL: https://issues.apache.org/jira/browse/FLINK-35384
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.4
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg 
> sink. Want to publish some counter metrics for certain scenarios. This is 
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from 
> `DataStream`. 
> {code}
> public  DataStream partitionCustom(
> Partitioner partitioner, KeySelector keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
> pubic `Partitioner` interface, where we can implement the custom range 
> partitioner.
> `Partitioner` interface is a functional interface today. we can add a new 
> default `setup` method without breaking the backward compatibility.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> *default void setup(TaskIOMetricGroup metrics) {}*
> int partition(K key, int numPartitions);
> }
> {code}
> I know public interface requires a FLIP process. will do that if the 
> community agree with this feature request.
> Personally, `numPartitions` should be passed in the `setup` method too. But 
> it is a breaking change that is NOT worth the benefit right now.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
> int partition(K key);
> }
> {code}
> That would be similar to `StreamPartitioner#setup()` method that we would 
> need to modify for passing the metrics group.
> {code}
> @Internal
> public abstract class StreamPartitioner
> implements ChannelSelector>>, 
> Serializable {
> @Override
> public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
> this.numberOfChannels = numberOfChannels;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35384) Expose TaskIOMetricGroup to custom Partitioner

2024-05-16 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847116#comment-17847116
 ] 

Steven Zhen Wu commented on FLINK-35384:


one potential risk of this type of API is that it is not extensible. if we want 
to pass in another arg to partitioner, we need to break the compatibility or 
add a new method.
{code}
default void setup(TaskIOMetricGroup metrics) {}
{code}

Maybe we can move to the context model that is widely used in Flink
{code}
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
int partition(K key, int numPartitions);

void init(Context context) {}

interface Context {
int numberOfChannels();
   TaskIOMetricGroup metrics();
}
}
{code}



> Expose TaskIOMetricGroup to custom Partitioner
> --
>
> Key: FLINK-35384
> URL: https://issues.apache.org/jira/browse/FLINK-35384
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.4
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg 
> sink. Want to publish some counter metrics for certain scenarios. This is 
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from 
> `DataStream`. 
> {code}
> public  DataStream partitionCustom(
> Partitioner partitioner, KeySelector keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
> pubic `Partitioner` interface, where we can implement the custom range 
> partitioner.
> `Partitioner` interface is a functional interface today. we can add a new 
> default `setup` method without breaking the backward compatibility.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> *default void setup(TaskIOMetricGroup metrics) {}*
> int partition(K key, int numPartitions);
> }
> {code}
> I know public interface requires a FLIP process. will do that if the 
> community agree with this feature request.
> Personally, `numPartitions` should be passed in the `setup` method too. But 
> it is a breaking change that is NOT worth the benefit right now.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
> int partition(K key);
> }
> {code}
> That would be similar to `StreamPartitioner#setup()` method that we would 
> need to modify for passing the metrics group.
> {code}
> @Internal
> public abstract class StreamPartitioner
> implements ChannelSelector>>, 
> Serializable {
> @Override
> public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
> this.numberOfChannels = numberOfChannels;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35384) Expose TaskIOMetricGroup to custom Partitioner

2024-05-16 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-35384:
---
Summary: Expose TaskIOMetricGroup to custom Partitioner  (was: Expose 
metrics group to custom Partitioner)

> Expose TaskIOMetricGroup to custom Partitioner
> --
>
> Key: FLINK-35384
> URL: https://issues.apache.org/jira/browse/FLINK-35384
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.4
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg 
> sink. Want to publish some counter metrics for certain scenarios. This is 
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from 
> `DataStream`. 
> {code}
> public  DataStream partitionCustom(
> Partitioner partitioner, KeySelector keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
> pubic `Partitioner` interface, where we can implement the custom range 
> partitioner.
> `Partitioner` interface is a functional interface today. we can add a new 
> default `setup` method without breaking the backward compatibility.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> *default void setup(TaskIOMetricGroup metrics) {}*
> int partition(K key, int numPartitions);
> }
> {code}
> I know public interface requires a FLIP process. will do that if the 
> community agree with this feature request.
> Personally, `numPartitions` should be passed in the `setup` method too. But 
> it is a breaking change that is NOT worth the benefit right now.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
> int partition(K key);
> }
> {code}
> That would be similar to `StreamPartitioner#setup()` method that we would 
> need to modify for passing the metrics group.
> {code}
> @Internal
> public abstract class StreamPartitioner
> implements ChannelSelector>>, 
> Serializable {
> @Override
> public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
> this.numberOfChannels = numberOfChannels;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35384) Expose metrics group to custom Partitioner

2024-05-16 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-35384:
---
Description: 
I am trying to implement a custom range partitioner in the Flink Iceberg sink. 
Want to publish some counter metrics for certain scenarios. This is like the 
network metrics exposed in `TaskIOMetricGroup`.

We can implement the range partitioner using the public interface from 
`DataStream`. 
{code}
public  DataStream partitionCustom(
Partitioner partitioner, KeySelector keySelector)
{code}

We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
`CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
pubic `Partitioner` interface, where we can implement the custom range 
partitioner.

`Partitioner` interface is a functional interface today. we can add a new 
default `setup` method without breaking the backward compatibility.
{code}
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
*default void setup(TaskIOMetricGroup metrics) {}*
int partition(K key, int numPartitions);
}
{code}

I know public interface requires a FLIP process. will do that if the community 
agree with this feature request.

Personally, `numPartitions` should be passed in the `setup` method too. But it 
is a breaking change that is NOT worth the benefit right now.
{code}
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
int partition(K key);
}
{code}

That would be similar to `StreamPartitioner#setup()` method that we would need 
to modify for passing the metrics group.
{code}
@Internal
public abstract class StreamPartitioner
implements ChannelSelector>>, 
Serializable {
@Override
public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
this.numberOfChannels = numberOfChannels;
}
{code}


  was:
I am trying to implement a custom range partitioner in the Flink Iceberg sink. 
Want to publish some counter metrics for certain scenarios. This is like the 
network metrics exposed in `TaskIOMetricGroup`.

We can implement the range partitioner using the public interface from 
`DataStream`. 
```
public  DataStream partitionCustom(
Partitioner partitioner, KeySelector keySelector)
```

We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
`CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
pubic `Partitioner` interface, where we can implement the custom range 
partitioner.

`Partitioner` interface is a functional interface today. we can add a new 
default `setup` method without breaking the backward compatibility.
```
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
*default void setup(TaskIOMetricGroup metrics) {}*
int partition(K key, int numPartitions);
}
```

I know public interface requires a FLIP process. will do that if the community 
agree with this feature request.

Personally, `numPartitions` should be passed in the `setup` method too. But it 
is a breaking change that is NOT worth the benefit right now.
```
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
int partition(K key);
}
```

That would be similar to `StreamPartitioner#setup()` method that we would need 
to modify for passing the metrics group.
```
@Internal
public abstract class StreamPartitioner
implements ChannelSelector>>, 
Serializable {
@Override
public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
this.numberOfChannels = numberOfChannels;
}
```



> Expose metrics group to custom Partitioner
> --
>
> Key: FLINK-35384
> URL: https://issues.apache.org/jira/browse/FLINK-35384
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.4
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg 
> sink. Want to publish some counter metrics for certain scenarios. This is 
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from 
> `DataStream`. 
> {code}
> public  DataStream partitionCustom(
> Partitioner partitioner, KeySelector keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
> pubic `Partitioner` interface, where we can implement the custom range 
> partitioner.
> `Partiti

[jira] [Created] (FLINK-35384) Expose metrics group to custom Partitioner

2024-05-16 Thread Steven Zhen Wu (Jira)
Steven Zhen Wu created FLINK-35384:
--

 Summary: Expose metrics group to custom Partitioner
 Key: FLINK-35384
 URL: https://issues.apache.org/jira/browse/FLINK-35384
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Affects Versions: 1.9.4
Reporter: Steven Zhen Wu


I am trying to implement a custom range partitioner in the Flink Iceberg sink. 
Want to publish some counter metrics for certain scenarios. This is like the 
network metrics exposed in `TaskIOMetricGroup`.

We can implement the range partitioner using the public interface from 
`DataStream`. 
```
public  DataStream partitionCustom(
Partitioner partitioner, KeySelector keySelector)
```

We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
`CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
pubic `Partitioner` interface, where we can implement the custom range 
partitioner.

`Partitioner` interface is a functional interface today. we can add a new 
default `setup` method without breaking the backward compatibility.
```
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
*default void setup(TaskIOMetricGroup metrics) {}*
int partition(K key, int numPartitions);
}
```

I know public interface requires a FLIP process. will do that if the community 
agree with this feature request.

Personally, `numPartitions` should be passed in the `setup` method too. But it 
is a breaking change that is NOT worth the benefit right now.
```
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
int partition(K key);
}
```

That would be similar to `StreamPartitioner#setup()` method that we would need 
to modify for passing the metrics group.
```
@Internal
public abstract class StreamPartitioner
implements ChannelSelector>>, 
Serializable {
@Override
public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
this.numberOfChannels = numberOfChannels;
}
```




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35353) Translate "Profiler" page into Chinese

2024-05-14 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-35353:
---

Assignee: Juan Zifeng

> Translate  "Profiler" page into Chinese
> ---
>
> Key: FLINK-35353
> URL: https://issues.apache.org/jira/browse/FLINK-35353
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Affects Versions: 1.19.0
>Reporter: Juan Zifeng
>Assignee: Juan Zifeng
>Priority: Major
> Fix For: 1.19.0
>
>
> The links are 
> https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/ops/debugging/profiler/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-20400) Migrate test_streaming_sql.sh

2024-04-29 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-20400:
---

Assignee: Muhammet Orazov

> Migrate test_streaming_sql.sh
> -
>
> Key: FLINK-20400
> URL: https://issues.apache.org/jira/browse/FLINK-20400
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Tests
>Reporter: Jark Wu
>Assignee: Muhammet Orazov
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2024-04-08 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834844#comment-17834844
 ] 

Jark Wu edited comment on FLINK-33934 at 4/8/24 8:31 AM:
-

[~yunta] yes, I think the row format should avoid reusing RowData just like 
other formats (JSON, AVRO).


was (Author: jark):
[~yunta] yes, I think the row format should avoid reuse RowData just like other 
formats (JSON, AVRO).

> Flink SQL Source use raw format maybe lead to data lost
> ---
>
> Key: FLINK-33934
> URL: https://issues.apache.org/jira/browse/FLINK-33934
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Runtime
>Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0, 
> 1.19.0
>Reporter: Cai Liuyang
>Assignee: Yuan Kui
>Priority: Major
>
> In our product we encounter a case that lead to data lost, the job info: 
>    1. using flinkSQL that read data from messageQueue (our internal mq) and 
> write to hive (only select value field, doesn't contain metadata field)
>    2. the format of source table is raw format
>  
> But if we select value field and metadata field at the same time, than the 
> data lost will not appear
>  
> After we review the code, we found that the reason is the object reuse of 
> Raw-format(see code 
> [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
>  why object reuse will lead to this problem is below (take kafka as example):
>     1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
> SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
> partition, than put data to ElementQueue (see code [SourceOperator 
> FetcherTask 
> |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
>     2. SourceOperator's main thread will pull data from the 
> ElementQueue(which is shared with the FetcherThread) and process it (see code 
> [SourceOperator main 
> thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
>     3. For RawFormatDeserializationSchema, its deserialize function will 
> return the same object([reuse rowData 
> object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
>     4. So, if elementQueue have element that not be consumed, than the 
> fetcherThread can change the filed of the reused rawData that 
> RawFormatDeserializationSchema::deserialize returned, this will lead to data 
> lost;
>  
> The reason that we select value and metadata field at the same time will not 
> encounter data lost is:
>    if we select metadata field there will return a new RowData object see 
> code: [DynamicKafkaDeserializationSchema deserialize with metadata field 
> |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
>  and if we only select value filed, it will reuse the RowData object that 
> formatDeserializationSchema returned see code 
> [DynamicKafkaDeserializationSchema deserialize only with value 
> field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
>  
> To solve this problem, i think we should remove reuse object of 
> RawFormatDeserializationSchema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33934) Flink SQL Source use raw format maybe lead to data lost

2024-04-08 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834844#comment-17834844
 ] 

Jark Wu commented on FLINK-33934:
-

[~yunta] yes, I think the row format should avoid reuse RowData just like other 
formats (JSON, AVRO).

> Flink SQL Source use raw format maybe lead to data lost
> ---
>
> Key: FLINK-33934
> URL: https://issues.apache.org/jira/browse/FLINK-33934
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Runtime
>Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0, 
> 1.19.0
>Reporter: Cai Liuyang
>Assignee: Yuan Kui
>Priority: Major
>
> In our product we encounter a case that lead to data lost, the job info: 
>    1. using flinkSQL that read data from messageQueue (our internal mq) and 
> write to hive (only select value field, doesn't contain metadata field)
>    2. the format of source table is raw format
>  
> But if we select value field and metadata field at the same time, than the 
> data lost will not appear
>  
> After we review the code, we found that the reason is the object reuse of 
> Raw-format(see code 
> [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]),
>  why object reuse will lead to this problem is below (take kafka as example):
>     1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of 
> SourceOperator, Fetcher-Thread will read and deserialize data from kafka 
> partition, than put data to ElementQueue (see code [SourceOperator 
> FetcherTask 
> |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64])
>     2. SourceOperator's main thread will pull data from the 
> ElementQueue(which is shared with the FetcherThread) and process it (see code 
> [SourceOperator main 
> thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188])
>     3. For RawFormatDeserializationSchema, its deserialize function will 
> return the same object([reuse rowData 
> object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62])
>     4. So, if elementQueue have element that not be consumed, than the 
> fetcherThread can change the filed of the reused rawData that 
> RawFormatDeserializationSchema::deserialize returned, this will lead to data 
> lost;
>  
> The reason that we select value and metadata field at the same time will not 
> encounter data lost is:
>    if we select metadata field there will return a new RowData object see 
> code: [DynamicKafkaDeserializationSchema deserialize with metadata field 
> |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249]
>  and if we only select value filed, it will reuse the RowData object that 
> formatDeserializationSchema returned see code 
> [DynamicKafkaDeserializationSchema deserialize only with value 
> field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113]
>  
> To solve this problem, i think we should remove reuse object of 
> RawFormatDeserializationSchema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34654) Add "Special Thanks" Page on the Flink Website

2024-03-12 Thread Jark Wu (Jira)
Jark Wu created FLINK-34654:
---

 Summary: Add "Special Thanks" Page on the Flink Website
 Key: FLINK-34654
 URL: https://issues.apache.org/jira/browse/FLINK-34654
 Project: Flink
  Issue Type: New Feature
  Components: Project Website
Reporter: Jark Wu
Assignee: Jark Wu


This issue aims to add a "Special Thanks" page on the Flink website 
(https://flink.apache.org/) to honor and appreciate the companies and 
organizations that have sponsored machines or services for our project.

Discussion thread: 
https://lists.apache.org/thread/y5g0nd5t8h2ql4gq7d0kb9tkwv1wkm1j



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-20399) Migrate test_sql_client.sh

2024-03-05 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-20399.
---
  Assignee: (was: Lorenzo Affetti)
Resolution: Fixed

> Migrate test_sql_client.sh
> --
>
> Key: FLINK-20399
> URL: https://issues.apache.org/jira/browse/FLINK-20399
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Client, Tests
>Reporter: Jark Wu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-20399) Migrate test_sql_client.sh

2024-03-05 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823859#comment-17823859
 ] 

Jark Wu commented on FLINK-20399:
-

[~lorenzo.affetti] Yes, I think so. Thank you for the reminder. 

> Migrate test_sql_client.sh
> --
>
> Key: FLINK-20399
> URL: https://issues.apache.org/jira/browse/FLINK-20399
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Client, Tests
>Reporter: Jark Wu
>Assignee: Lorenzo Affetti
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34214) FLIP-377: Support fine-grained configuration to control filter push down for Table/SQL Sources

2024-01-25 Thread Rascal Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17811142#comment-17811142
 ] 

Rascal Wu commented on FLINK-34214:
---

Hi, may i consulting one question that if we disable the push down for 
JDBC/MongoDB source, does it mean the source will scan whole data?

The performance of scan whole data is worse than filter data via unindexed 
column?

> FLIP-377: Support fine-grained configuration to control filter push down for 
> Table/SQL Sources
> --
>
> Key: FLINK-34214
> URL: https://issues.apache.org/jira/browse/FLINK-34214
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC, Connectors / MongoDB
>Affects Versions: mongodb-1.0.2, jdbc-3.1.2
>Reporter: Jiabao Sun
>Priority: Major
> Fix For: mongodb-1.1.0, jdbc-3.1.3
>
>
> This improvement implements [FLIP-377 Support fine-grained configuration to 
> control filter push down for Table/SQL 
> Sources|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=276105768]
> This FLIP has 2 goals:
>  * Introduces a new configuration filter.handling.policy to the JDBC and 
> MongoDB connector.
>  * Suggests a convention option name if other connectors are going to add an 
> option for the same purpose.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34147) TimestampData to/from LocalDateTime is ambiguous

2024-01-24 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810723#comment-17810723
 ] 

Jark Wu commented on FLINK-34147:
-

I agree it is ambiguous. Currently, {{TimestampData}} represents both 
{{Instant}} and {{LocalDateTime}}. 
* When it represents as an {{Instant}}, you can use conversion methods 
{{fromInstant()/toInstant()}} to convert an {{Instant}} to {{TimestampData}}, 
but do not use {{fromLocalDateTime()/toLocalDateTime()}}. 
* When it represents as an {{LocalDateTime}}, you can use conversion methods 
{{fromLocalDateTime()/toLocalDateTime()}} to convert a {{LocalDateTime}} to 
{{TimestampData}}, but do not use {{fromInstant()/toInstant()}}. 

Therefore, if you have a Java {{LocalDateTime}} and want to convert it into an 
Instant {{TimestampData}}, you need to convert the {{LocalDateTime}} into Java 
{{Instant}} with a specific time zone first. And then use the {{Instant}} to 
create the {{TimestampData}}.

This is not very clear from the javadoc of {{TimestampData}} and I agree we 
need to improve the doc. 

> TimestampData to/from LocalDateTime is ambiguous
> 
>
> Key: FLINK-34147
> URL: https://issues.apache.org/jira/browse/FLINK-34147
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Rui Li
>Priority: Major
>
> It seems TimestampData is essentially an {{Instant}}. Therefore an implicit 
> time zone is used in the {{fromLocalDateTime}} and {{toLocalDateTime}} 
> methods. However neither the method name nor the API doc indicates which time 
> zone is used. So from caller's perspective, the results of these two methods 
> are ambiguous.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-27331) Support Avro microsecond precision for TIME

2024-01-22 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-27331:
---

Assignee: Peter Huang

> Support Avro microsecond precision for TIME
> ---
>
> Key: FLINK-27331
> URL: https://issues.apache.org/jira/browse/FLINK-27331
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Marios Trivyzas
>Assignee: Peter Huang
>Priority: Major
>
> Avro spec: 
> https://avro.apache.org/docs/1.8.0/spec.html#Time+%28microsecond+precision%29



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-24939) Support 'SHOW CREATE CATALOG' syntax

2024-01-01 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-24939:
---

Assignee: Yubin Li

> Support 'SHOW CREATE CATALOG' syntax
> 
>
> Key: FLINK-24939
> URL: https://issues.apache.org/jira/browse/FLINK-24939
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>
> SHOW CREATE CATALOG ;
>  
> `Catalog` is playing a more import role in flink, it would be great to get 
> existing catalog detail information



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-20281) Window aggregation supports changelog stream input

2023-12-26 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-20281:
---

Assignee: xuyang

> Window aggregation supports changelog stream input
> --
>
> Key: FLINK-20281
> URL: https://issues.apache.org/jira/browse/FLINK-20281
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Jark Wu
>Assignee: xuyang
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
> Attachments: screenshot-1.png
>
>
> Currently, window aggregation doesn't support to consume a changelog stream. 
> This makes it impossible to do a window aggregation on changelog sources 
> (e.g. Kafka with Debezium format, or upsert-kafka, or mysql-cdc). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33600) Print cost time for batch queries in SQL Client

2023-11-20 Thread Jark Wu (Jira)
Jark Wu created FLINK-33600:
---

 Summary: Print cost time for batch queries in SQL Client
 Key: FLINK-33600
 URL: https://issues.apache.org/jira/browse/FLINK-33600
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Client
Reporter: Jark Wu
 Fix For: 1.19.0


Currently, there is no cost time information when executing batch queries in 
SQL CLI. But this is very helpful in OLAP/ad-hoc scenarios. 

For example: 
{code}
Flink SQL> select * from (values ('abc', 123));
+++
| EXPR$0 | EXPR$1 |
+++
|abc |123 |
+++
1 row in set  (0.22 seconds)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-20281) Window aggregation supports changelog stream input

2023-10-15 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-20281:

Priority: Major  (was: Not a Priority)

> Window aggregation supports changelog stream input
> --
>
> Key: FLINK-20281
> URL: https://issues.apache.org/jira/browse/FLINK-20281
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Jark Wu
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
> Attachments: screenshot-1.png
>
>
> Currently, window aggregation doesn't support to consume a changelog stream. 
> This makes it impossible to do a window aggregation on changelog sources 
> (e.g. Kafka with Debezium format, or upsert-kafka, or mysql-cdc). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33063) udaf with user defined pojo object throw error while generate record equaliser

2023-09-09 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17763319#comment-17763319
 ] 

Jark Wu commented on FLINK-33063:
-

Fixed in 
- master: 01cb6ef008273100718cd8a8cfd921c02b8b8caa
- release-1.18: TODO

> udaf with user defined pojo object throw error while generate record equaliser
> --
>
> Key: FLINK-33063
> URL: https://issues.apache.org/jira/browse/FLINK-33063
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Udaf with user define pojo object throw error while generating record 
> equaliser: 
> When user create an udaf while recore contains user define complex pojo 
> object (like List or Map). The codegen 
> will throw error while generating record equaliser, the error is:
> {code:java}
> A method named "compareTo" is not declared in any enclosing class nor any 
> subtype, nor through a static import.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33063) udaf with user defined pojo object throw error while generate record equaliser

2023-09-09 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-33063:

Fix Version/s: 1.19.0

> udaf with user defined pojo object throw error while generate record equaliser
> --
>
> Key: FLINK-33063
> URL: https://issues.apache.org/jira/browse/FLINK-33063
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.19.0
>
>
> Udaf with user define pojo object throw error while generating record 
> equaliser: 
> When user create an udaf while recore contains user define complex pojo 
> object (like List or Map). The codegen 
> will throw error while generating record equaliser, the error is:
> {code:java}
> A method named "compareTo" is not declared in any enclosing class nor any 
> subtype, nor through a static import.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33015) TableAggregateFunction codegen does not eval emitUpdateWithRetract

2023-08-31 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17761195#comment-17761195
 ] 

Jark Wu commented on FLINK-33015:
-

Maybe duplicated with https://issues.apache.org/jira/browse/FLINK-31788 ?

> TableAggregateFunction codegen does not eval emitUpdateWithRetract
> --
>
> Key: FLINK-33015
> URL: https://issues.apache.org/jira/browse/FLINK-33015
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1, 1.15.4, 
> 1.16.2, 1.18.0, 1.17.1
>Reporter: Jane Chan
>Priority: Major
>
> https://www.mail-archive.com/user-zh@flink.apache.org/msg15251.html
> This issue arose after the retirement of BlinkPlanner



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32952) Scan reuse with readable metadata and watermark push down will get wrong watermark

2023-08-30 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760471#comment-17760471
 ] 

Jark Wu commented on FLINK-32952:
-

Fixed in 
- master: 103de5bf136816ce1e520f372e17b162e4aa2ba7
- release-1.18: TODO

> Scan reuse with readable metadata and watermark push down will get wrong 
> watermark 
> ---
>
> Key: FLINK-32952
> URL: https://issues.apache.org/jira/browse/FLINK-32952
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.19.0
>
>
> Scan reuse with readable metadata and watermark push down will get wrong 
> result. In class ScanReuser, we will re-build watermark spec after projection 
> push down. However, we will get wrong index while try to find index in new 
> source type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32952) Scan reuse with readable metadata and watermark push down will get wrong watermark

2023-08-30 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-32952:

Fix Version/s: 1.19.0

> Scan reuse with readable metadata and watermark push down will get wrong 
> watermark 
> ---
>
> Key: FLINK-32952
> URL: https://issues.apache.org/jira/browse/FLINK-32952
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.19.0
>
>
> Scan reuse with readable metadata and watermark push down will get wrong 
> result. In class ScanReuser, we will re-build watermark spec after projection 
> push down. However, we will get wrong index while try to find index in new 
> source type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32952) Scan reuse with readable metadata and watermark push down will get wrong watermark

2023-08-30 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-32952:
---

Assignee: Yunhong Zheng

> Scan reuse with readable metadata and watermark push down will get wrong 
> watermark 
> ---
>
> Key: FLINK-32952
> URL: https://issues.apache.org/jira/browse/FLINK-32952
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.19.0
>
>
> Scan reuse with readable metadata and watermark push down will get wrong 
> result. In class ScanReuser, we will re-build watermark spec after projection 
> push down. However, we will get wrong index while try to find index in new 
> source type.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32905) Fix the bug of broadcast hash join doesn't support spill to disk when enable operator fusion codegn

2023-08-24 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758000#comment-17758000
 ] 

Jark Wu edited comment on FLINK-32905 at 8/24/23 11:22 AM:
---

Fixed in 
 - master: 2cf32e82773254b79af4dad7ee317db4878d6f8a
 - release-1.18: c78f440533efe68687ab1e6e04d3407e40303de7


was (Author: jark):
Fixed in 
 - master: 2cf32e82773254b79af4dad7ee317db4878d6f8a
 - release-1.18: TODO

> Fix the bug of broadcast hash join doesn't support spill to disk when enable 
> operator fusion codegn
> ---
>
> Key: FLINK-32905
> URL: https://issues.apache.org/jira/browse/FLINK-32905
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32905) Fix the bug of broadcast hash join doesn't support spill to disk when enable operator fusion codegn

2023-08-24 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-32905.
---
Resolution: Fixed

> Fix the bug of broadcast hash join doesn't support spill to disk when enable 
> operator fusion codegn
> ---
>
> Key: FLINK-32905
> URL: https://issues.apache.org/jira/browse/FLINK-32905
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32905) Fix the bug of broadcast hash join doesn't support spill to disk when enable operator fusion codegn

2023-08-23 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758000#comment-17758000
 ] 

Jark Wu commented on FLINK-32905:
-

Fixed in 
 - master: 2cf32e82773254b79af4dad7ee317db4878d6f8a
 - release-1.18: TODO

> Fix the bug of broadcast hash join doesn't support spill to disk when enable 
> operator fusion codegn
> ---
>
> Key: FLINK-32905
> URL: https://issues.apache.org/jira/browse/FLINK-32905
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32905) Fix the bug of broadcast hash join doesn't support spill to disk when enable operator fusion codegn

2023-08-23 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-32905:

Fix Version/s: 1.19.0

> Fix the bug of broadcast hash join doesn't support spill to disk when enable 
> operator fusion codegn
> ---
>
> Key: FLINK-32905
> URL: https://issues.apache.org/jira/browse/FLINK-32905
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32905) Fix the bug of broadcast hash join doesn't support spill to disk when enable operator fusion codegn

2023-08-23 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-32905:
---

Assignee: dalongliu

> Fix the bug of broadcast hash join doesn't support spill to disk when enable 
> operator fusion codegn
> ---
>
> Key: FLINK-32905
> URL: https://issues.apache.org/jira/browse/FLINK-32905
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2023-08-23 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17757993#comment-17757993
 ] 

Jark Wu commented on FLINK-32940:
-

[~337361...@qq.com] do you have time to help take a look?

> Support projection pushdown to table source for column projections through 
> UDTF
> ---
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> select t1.name, t2.ename from DEPT_NESTED as t1, unnest(t1.employees) as 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2023-08-23 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17757992#comment-17757992
 ] 

Jark Wu edited comment on FLINK-32940 at 8/23/23 12:21 PM:
---

Hi [~vsowrirajan], thank you for reporting this problem. Which Flink version 
are you using? And could you share the execution plan of the query by executing 
`EXPLAIN` statement on the query?


was (Author: jark):
Hi [~vsowrirajan], thank you for reporting this problem. Which Flink version 
are you using? 

> Support projection pushdown to table source for column projections through 
> UDTF
> ---
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> select t1.name, t2.ename from DEPT_NESTED as t1, unnest(t1.employees) as 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2023-08-23 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17757992#comment-17757992
 ] 

Jark Wu commented on FLINK-32940:
-

Hi [~vsowrirajan], thank you for reporting this problem. Which Flink version 
are you using? 

> Support projection pushdown to table source for column projections through 
> UDTF
> ---
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> select t1.name, t2.ename from DEPT_NESTED as t1, unnest(t1.employees) as 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32909) The jobmanager.sh pass arguments failed

2023-08-22 Thread Alex Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17757763#comment-17757763
 ] 

Alex Wu commented on FLINK-32909:
-

Yes, I would be happy to. Emmm, I should prepare a PR next, right?

> The jobmanager.sh pass arguments failed
> ---
>
> Key: FLINK-32909
> URL: https://issues.apache.org/jira/browse/FLINK-32909
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Scripts
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Alex Wu
>Priority: Major
>
> I' m trying to use the jobmanager.sh script to create a jobmanager instance 
> manually, and I need to pass arugments to the script dynamically, rather than 
> through flink-conf.yaml. But I found that I didn't succeed in doing that when 
> I commented out all configurations in the flink-conf.yaml,  I typed command 
> like:
>  
> {code:java}
> ./bin/jobmanager.sh start -D jobmanager.memory.flink.size=1024m -D 
> jobmanager.rpc.address=xx.xx.xx.xx -D jobmanager.rpc.port=xxx -D 
> jobmanager.bind-host=0.0.0.0 -D rest.address=xx.xx.xx.xx -D rest.port=xxx -D 
> rest.bind-address=0.0.0.0{code}
> but I got some errors below:
>  
> {code:java}
> [ERROR] The execution result is empty.
> [ERROR] Could not get JVM parameters and dynamic configurations properly.
> [ERROR] Raw output from BashJavaUtils:
> WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will 
> impact performance.
> Exception in thread "main" 
> org.apache.flink.configuration.IllegalConfigurationException: JobManager 
> memory configuration failed: Either required fine-grained memory 
> (jobmanager.memory.heap.size), or Total Flink Memory size (Key: 
> 'jobmanager.memory.flink.size' , default: null (fallback keys: [])), or Total 
> Process Memory size (Key: 'jobmanager.memory.process.size' , default: null 
> (fallback keys: [])) need to be configured explicitly.
>         at 
> org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(JobManagerProcessUtils.java:78)
>         at 
> org.apache.flink.runtime.util.bash.BashJavaUtils.getJmResourceParams(BashJavaUtils.java:98)
>         at 
> org.apache.flink.runtime.util.bash.BashJavaUtils.runCommand(BashJavaUtils.java:69)
>         at 
> org.apache.flink.runtime.util.bash.BashJavaUtils.main(BashJavaUtils.java:56)
> Caused by: org.apache.flink.configuration.IllegalConfigurationException: 
> Either required fine-grained memory (jobmanager.memory.heap.size), or Total 
> Flink Memory size (Key: 'jobmanager.memory.flink.size' , default: null 
> (fallback keys: [])), or Total Process Memory size (Key: 
> 'jobmanager.memory.process.size' , default: null (fallback keys: [])) need to 
> be configured explicitly.
>         at 
> org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.failBecauseRequiredOptionsNotConfigured(ProcessMemoryUtils.java:129)
>         at 
> org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.memoryProcessSpecFromConfig(ProcessMemoryUtils.java:86)
>         at 
> org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfig(JobManagerProcessUtils.java:83)
>         at 
> org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(JobM
>  {code}
> It seems to remind me to configure memory for jobmanager instance explicitly, 
> but I had already passed the jobmanager.memory.flink.size parameter. So I 
> debug the script, and found a spelling error in the jobmanager.sh script at 
> line 54:
>  
> {code:java}
> parseJmArgsAndExportLogs "${ARGS[@]}"
> {code}
> the uppercase "$\{ARGS[@]}" is a wrong variable name here from a contextual 
> perspective, and causing an empty string passed to the function. I changed to 
> "$\{args[@]}" and It works fine.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32909) The jobmanager.sh pass arguments failed

2023-08-22 Thread Alex Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Wu updated FLINK-32909:

Description: 
I' m trying to use the jobmanager.sh script to create a jobmanager instance 
manually, and I need to pass arugments to the script dynamically, rather than 
through flink-conf.yaml. But I found that I didn't succeed in doing that when I 
commented out all configurations in the flink-conf.yaml,  I typed command like:

 
{code:java}
./bin/jobmanager.sh start -D jobmanager.memory.flink.size=1024m -D 
jobmanager.rpc.address=xx.xx.xx.xx -D jobmanager.rpc.port=xxx -D 
jobmanager.bind-host=0.0.0.0 -D rest.address=xx.xx.xx.xx -D rest.port=xxx -D 
rest.bind-address=0.0.0.0{code}
but I got some errors below:

 
{code:java}
[ERROR] The execution result is empty.
[ERROR] Could not get JVM parameters and dynamic configurations properly.
[ERROR] Raw output from BashJavaUtils:
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will 
impact performance.
Exception in thread "main" 
org.apache.flink.configuration.IllegalConfigurationException: JobManager memory 
configuration failed: Either required fine-grained memory 
(jobmanager.memory.heap.size), or Total Flink Memory size (Key: 
'jobmanager.memory.flink.size' , default: null (fallback keys: [])), or Total 
Process Memory size (Key: 'jobmanager.memory.process.size' , default: null 
(fallback keys: [])) need to be configured explicitly.
        at 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(JobManagerProcessUtils.java:78)
        at 
org.apache.flink.runtime.util.bash.BashJavaUtils.getJmResourceParams(BashJavaUtils.java:98)
        at 
org.apache.flink.runtime.util.bash.BashJavaUtils.runCommand(BashJavaUtils.java:69)
        at 
org.apache.flink.runtime.util.bash.BashJavaUtils.main(BashJavaUtils.java:56)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Either 
required fine-grained memory (jobmanager.memory.heap.size), or Total Flink 
Memory size (Key: 'jobmanager.memory.flink.size' , default: null (fallback 
keys: [])), or Total Process Memory size (Key: 'jobmanager.memory.process.size' 
, default: null (fallback keys: [])) need to be configured explicitly.
        at 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.failBecauseRequiredOptionsNotConfigured(ProcessMemoryUtils.java:129)
        at 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.memoryProcessSpecFromConfig(ProcessMemoryUtils.java:86)
        at 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfig(JobManagerProcessUtils.java:83)
        at 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(JobM
 {code}
It seems to remind me to configure memory for jobmanager instance explicitly, 
but I had already passed the jobmanager.memory.flink.size parameter. So I debug 
the script, and found a spelling error in the jobmanager.sh script at line 54:

 
{code:java}
parseJmArgsAndExportLogs "${ARGS[@]}"
{code}
the uppercase "$\{ARGS[@]}" is a wrong variable name here from a contextual 
perspective, and causing an empty string passed to the function. I changed to 
"$\{args[@]}" and It works fine.

 

 

 

  was:
I' m trying to use the jobmanager.sh script to create a jobmanager instance 
manually, and I need to pass arugments to the script dynamically, rather than 
through flink-conf.yaml. But I found that I didn't succeed in doing that when I 
commented out all configurations in the flink-conf.yaml,  I typed command like:

 
{code:java}
./bin/jobmanager.sh start -D jobmanager.memory.flink.size=1024m -D 
jobmanager.rpc.address=xx.xx.xx.xx -D jobmanager.rpc.port=xxx -D 
jobmanager.bind-host=0.0.0.0 -Drest.address=xx.xx.xx.xx -Drest.port=xxx 
-Drest.bind-address=0.0.0.0{code}
but I got some errors below:

 
{code:java}
[ERROR] The execution result is empty.
[ERROR] Could not get JVM parameters and dynamic configurations properly.
[ERROR] Raw output from BashJavaUtils:
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will 
impact performance.
Exception in thread "main" 
org.apache.flink.configuration.IllegalConfigurationException: JobManager memory 
configuration failed: Either required fine-grained memory 
(jobmanager.memory.heap.size), or Total Flink Memory size (Key: 
'jobmanager.memory.flink.size' , default: null (fallback keys: [])), or Total 
Process Memory size (Key: 'jobmanager.memory.process.size' , default: null 
(fallback keys: [])) need to be configured explicitly.
        at 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(JobManagerProcessUtils.java:78)
        at 
org.apache.flink.runtime.util.bash.BashJavaUtils.getJmResourceParams(BashJavaUtils.java:98)
        at 
org.apache.flink.runti

[jira] [Updated] (FLINK-32909) The jobmanager.sh pass arguments failed

2023-08-22 Thread Alex Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Wu updated FLINK-32909:

Description: 
I' m trying to use the jobmanager.sh script to create a jobmanager instance 
manually, and I need to pass arugments to the script dynamically, rather than 
through flink-conf.yaml. But I found that I didn't succeed in doing that when I 
commented out all configurations in the flink-conf.yaml,  I typed command like:

 
{code:java}
./bin/jobmanager.sh start -D jobmanager.memory.flink.size=1024m -D 
jobmanager.rpc.address=xx.xx.xx.xx -D jobmanager.rpc.port=xxx -D 
jobmanager.bind-host=0.0.0.0 -Drest.address=xx.xx.xx.xx -Drest.port=xxx 
-Drest.bind-address=0.0.0.0{code}
but I got some errors below:

 
{code:java}
[ERROR] The execution result is empty.
[ERROR] Could not get JVM parameters and dynamic configurations properly.
[ERROR] Raw output from BashJavaUtils:
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will 
impact performance.
Exception in thread "main" 
org.apache.flink.configuration.IllegalConfigurationException: JobManager memory 
configuration failed: Either required fine-grained memory 
(jobmanager.memory.heap.size), or Total Flink Memory size (Key: 
'jobmanager.memory.flink.size' , default: null (fallback keys: [])), or Total 
Process Memory size (Key: 'jobmanager.memory.process.size' , default: null 
(fallback keys: [])) need to be configured explicitly.
        at 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(JobManagerProcessUtils.java:78)
        at 
org.apache.flink.runtime.util.bash.BashJavaUtils.getJmResourceParams(BashJavaUtils.java:98)
        at 
org.apache.flink.runtime.util.bash.BashJavaUtils.runCommand(BashJavaUtils.java:69)
        at 
org.apache.flink.runtime.util.bash.BashJavaUtils.main(BashJavaUtils.java:56)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Either 
required fine-grained memory (jobmanager.memory.heap.size), or Total Flink 
Memory size (Key: 'jobmanager.memory.flink.size' , default: null (fallback 
keys: [])), or Total Process Memory size (Key: 'jobmanager.memory.process.size' 
, default: null (fallback keys: [])) need to be configured explicitly.
        at 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.failBecauseRequiredOptionsNotConfigured(ProcessMemoryUtils.java:129)
        at 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.memoryProcessSpecFromConfig(ProcessMemoryUtils.java:86)
        at 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfig(JobManagerProcessUtils.java:83)
        at 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(JobM
 {code}
It seems to remind me to configure memory for jobmanager instance explicitly, 
but I had already passed the jobmanager.memory.flink.size parameter. So I debug 
the script, and found a spelling error in the jobmanager.sh script at line 54:

 
{code:java}
parseJmArgsAndExportLogs "${ARGS[@]}"
{code}
the uppercase "$\{ARGS[@]}" is a wrong variable name here from a contextual 
perspective, and causing an empty string passed to the function. I changed to 
"$\{args[@]}" and It works fine.

 

 

 

  was:
I' m try to use the jobmanager.sh script to create a jobmanager instance 
manually, and I need to pass arugments to the script dynamically, rather than 
through flink-conf.yaml. But I found that I didn't succeed in doing that when I 
commented out all configurations in the flink-conf.yaml,  I typed command like:

 
{code:java}
./bin/jobmanager.sh start -D jobmanager.memory.flink.size=1024m -D 
jobmanager.rpc.address=xx.xx.xx.xx -D jobmanager.rpc.port=xxx -D 
jobmanager.bind-host=0.0.0.0 -Drest.address=xx.xx.xx.xx -Drest.port=xxx 
-Drest.bind-address=0.0.0.0{code}
but I got some errors below:

 
{code:java}
[ERROR] The execution result is empty.
[ERROR] Could not get JVM parameters and dynamic configurations properly.
[ERROR] Raw output from BashJavaUtils:
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will 
impact performance.
Exception in thread "main" 
org.apache.flink.configuration.IllegalConfigurationException: JobManager memory 
configuration failed: Either required fine-grained memory 
(jobmanager.memory.heap.size), or Total Flink Memory size (Key: 
'jobmanager.memory.flink.size' , default: null (fallback keys: [])), or Total 
Process Memory size (Key: 'jobmanager.memory.process.size' , default: null 
(fallback keys: [])) need to be configured explicitly.
        at 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(JobManagerProcessUtils.java:78)
        at 
org.apache.flink.runtime.util.bash.BashJavaUtils.getJmResourceParams(BashJavaUtils.java:98)
        at 
org.apache.flink.runtime.uti

[jira] [Created] (FLINK-32909) The jobmanager.sh pass arguments failed

2023-08-22 Thread Alex Wu (Jira)
Alex Wu created FLINK-32909:
---

 Summary: The jobmanager.sh pass arguments failed
 Key: FLINK-32909
 URL: https://issues.apache.org/jira/browse/FLINK-32909
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Scripts
Affects Versions: 1.17.1, 1.16.2
Reporter: Alex Wu


I' m try to use the jobmanager.sh script to create a jobmanager instance 
manually, and I need to pass arugments to the script dynamically, rather than 
through flink-conf.yaml. But I found that I didn't succeed in doing that when I 
commented out all configurations in the flink-conf.yaml,  I typed command like:

 
{code:java}
./bin/jobmanager.sh start -D jobmanager.memory.flink.size=1024m -D 
jobmanager.rpc.address=xx.xx.xx.xx -D jobmanager.rpc.port=xxx -D 
jobmanager.bind-host=0.0.0.0 -Drest.address=xx.xx.xx.xx -Drest.port=xxx 
-Drest.bind-address=0.0.0.0{code}
but I got some errors below:

 
{code:java}
[ERROR] The execution result is empty.
[ERROR] Could not get JVM parameters and dynamic configurations properly.
[ERROR] Raw output from BashJavaUtils:
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will 
impact performance.
Exception in thread "main" 
org.apache.flink.configuration.IllegalConfigurationException: JobManager memory 
configuration failed: Either required fine-grained memory 
(jobmanager.memory.heap.size), or Total Flink Memory size (Key: 
'jobmanager.memory.flink.size' , default: null (fallback keys: [])), or Total 
Process Memory size (Key: 'jobmanager.memory.process.size' , default: null 
(fallback keys: [])) need to be configured explicitly.
        at 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(JobManagerProcessUtils.java:78)
        at 
org.apache.flink.runtime.util.bash.BashJavaUtils.getJmResourceParams(BashJavaUtils.java:98)
        at 
org.apache.flink.runtime.util.bash.BashJavaUtils.runCommand(BashJavaUtils.java:69)
        at 
org.apache.flink.runtime.util.bash.BashJavaUtils.main(BashJavaUtils.java:56)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Either 
required fine-grained memory (jobmanager.memory.heap.size), or Total Flink 
Memory size (Key: 'jobmanager.memory.flink.size' , default: null (fallback 
keys: [])), or Total Process Memory size (Key: 'jobmanager.memory.process.size' 
, default: null (fallback keys: [])) need to be configured explicitly.
        at 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.failBecauseRequiredOptionsNotConfigured(ProcessMemoryUtils.java:129)
        at 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.memoryProcessSpecFromConfig(ProcessMemoryUtils.java:86)
        at 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfig(JobManagerProcessUtils.java:83)
        at 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(JobM
 {code}
It seems to remind me to configure memory for jobmanager instance explicitly, 
but I had already passed the jobmanager.memory.flink.size parameter. So I debug 
the script, and found a spelling error in the jobmanager.sh script at line 54:

 
{code:java}
parseJmArgsAndExportLogs "${ARGS[@]}"
{code}
the uppercase "${ARGS[@]}" is a wrong variable name here from a contextual 
perspective, and causing an empty string passed to the function. I changed to 
"${args[@]}" and It works fine.

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32776) Adding HTTP basic authentication support to PrometheusPushGatewayReporter

2023-08-14 Thread Qiwei Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17754379#comment-17754379
 ] 

Qiwei Wu commented on FLINK-32776:
--

hi~[~martijnvisser] can you please assign this ticket to me? I've already 
submitted a pull request to accomplish this.

> Adding HTTP basic authentication support to PrometheusPushGatewayReporter
> -
>
> Key: FLINK-32776
> URL: https://issues.apache.org/jira/browse/FLINK-32776
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Metrics
>Affects Versions: 1.17.1
>Reporter: Qiwei Wu
>Priority: Major
>  Labels: pull-request-available
>
> For security reasons, PushGateways generally enable HTTP basic authentication
> but currently Flink does not support pushing metrics to a PushGateway that 
> has HTTP basic authentication turned on



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-32776) Adding HTTP basic authentication support to PrometheusPushGatewayReporter

2023-08-14 Thread Qiwei Wu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-32776 ]


Qiwei Wu deleted comment on FLINK-32776:
--

was (Author: JIRAUSER301673):
[~Weijie Guo]  can you please assign this ticket to [~stonewu] ?

> Adding HTTP basic authentication support to PrometheusPushGatewayReporter
> -
>
> Key: FLINK-32776
> URL: https://issues.apache.org/jira/browse/FLINK-32776
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Metrics
>Affects Versions: 1.17.1
>Reporter: Qiwei Wu
>Priority: Major
>  Labels: pull-request-available
>
> For security reasons, PushGateways generally enable HTTP basic authentication
> but currently Flink does not support pushing metrics to a PushGateway that 
> has HTTP basic authentication turned on



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32776) Adding HTTP basic authentication support to PrometheusPushGatewayReporter

2023-08-10 Thread Qiwei Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752722#comment-17752722
 ] 

Qiwei Wu commented on FLINK-32776:
--

[~Weijie Guo]  can you please assign this ticket to [~stonewu] ?

> Adding HTTP basic authentication support to PrometheusPushGatewayReporter
> -
>
> Key: FLINK-32776
> URL: https://issues.apache.org/jira/browse/FLINK-32776
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Metrics
>Affects Versions: 1.17.1
>Reporter: Qiwei Wu
>Priority: Major
>  Labels: pull-request-available
>
> For security reasons, PushGateways generally enable HTTP basic authentication
> but currently Flink does not support pushing metrics to a PushGateway that 
> has HTTP basic authentication turned on



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32776) Adding HTTP basic authentication support to PrometheusPushGatewayReporter

2023-08-08 Thread Qiwei Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qiwei Wu updated FLINK-32776:
-
Remaining Estimate: (was: 1h)
 Original Estimate: (was: 1h)

> Adding HTTP basic authentication support to PrometheusPushGatewayReporter
> -
>
> Key: FLINK-32776
> URL: https://issues.apache.org/jira/browse/FLINK-32776
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Metrics
>Affects Versions: 1.17.1
>Reporter: Qiwei Wu
>Priority: Major
>  Labels: pull-request-available
>
> For security reasons, PushGateways generally enable HTTP basic authentication
> but currently Flink does not support pushing metrics to a PushGateway that 
> has HTTP basic authentication turned on



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32776) Adding HTTP basic authentication support to PrometheusPushGatewayReporter

2023-08-07 Thread Qiwei Wu (Jira)
Qiwei Wu created FLINK-32776:


 Summary: Adding HTTP basic authentication support to 
PrometheusPushGatewayReporter
 Key: FLINK-32776
 URL: https://issues.apache.org/jira/browse/FLINK-32776
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Metrics
Affects Versions: 1.17.1
Reporter: Qiwei Wu


For security reasons, PushGateways generally enable HTTP basic authentication

but currently Flink does not support pushing metrics to a PushGateway that has 
HTTP basic authentication turned on



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32279) Shuffle HashJoin support spill to disk when enable operator fusion codegen

2023-07-23 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-32279.
---
Fix Version/s: 1.18.0
 Assignee: dalongliu
   Resolution: Fixed

Fixed in master: bff837622ddfd2cc8ee2f605eea953c7bbf0cb4b and 
a0a66d7039883c23e49f1069d860b65fe0f6b530

> Shuffle HashJoin support spill to disk when enable operator fusion codegen
> --
>
> Key: FLINK-32279
> URL: https://issues.apache.org/jira/browse/FLINK-32279
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32280) HashAgg support operator fusion codegen

2023-07-22 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-32280.
---
Fix Version/s: 1.18.0
 Assignee: dalongliu
   Resolution: Fixed

Fixed in master: 
f500b59391161409fc8fb909db0744068975269d...6fd47ec7cbfad295d4430f85624fc1b14e215b10

> HashAgg support operator fusion codegen
> ---
>
> Key: FLINK-32280
> URL: https://issues.apache.org/jira/browse/FLINK-32280
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32610) JSON format supports projection push down

2023-07-20 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-32610:

Release Note: The JSON format introduced JsonParser as a new default way to 
deserialize JSON data. JsonParser is a Jackson JSON streaming API to read JSON 
data which is much faster and consumes less memory compared to the previous 
JsonNode approach. This should be a compatible change, if you encounter any 
issues after upgrading, you can fallback to the previous JsonNode approach by 
setting `json.decode.json-parser.enabled` to `false`.   (was: The JSON format 
introduced JsonParser as a new default way to deserialize JSON data. JsonParser 
is a Jackson JSON streaming API to read JSON data which is much faster and 
consumes less memory compared to the previous JsonNode approach. This should be 
a compatible change, if you encounter any issues after upgrade, you can 
fallback to the previous JsonNode approach by setting 
`json.decode.json-parser.enabled` to `false`. )

> JSON format supports projection push down
> -
>
> Key: FLINK-32610
> URL: https://issues.apache.org/jira/browse/FLINK-32610
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32610) JSON format supports projection push down

2023-07-20 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-32610.
---
Fix Version/s: 1.18.0
 Release Note: The JSON format introduced JsonParser as a new default way 
to deserialize JSON data. JsonParser is a Jackson JSON streaming API to read 
JSON data which is much faster and consumes less memory compared to the 
previous JsonNode approach. This should be a compatible change, if you 
encounter any issues after upgrade, you can fallback to the previous JsonNode 
approach by setting `json.decode.json-parser.enabled` to `false`. 
 Assignee: dalongliu
   Resolution: Fixed

Fixed in master: 
58f7ceb45013bca847b39901f62ce746680e4f0c...d6967dd7301e82fa102f756e16635dabce1c550d

> JSON format supports projection push down
> -
>
> Key: FLINK-32610
> URL: https://issues.apache.org/jira/browse/FLINK-32610
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32632) Run Kubernetes test is unstable on AZP

2023-07-20 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17745130#comment-17745130
 ] 

Jark Wu edited comment on FLINK-32632 at 7/20/23 3:26 PM:
--

Another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51492&view=logs&j=bea52777-eaf8-5663-8482-18fbc3630e81&t=43ba8ce7-ebbf-57cd-9163-444305d74117&l=6212


was (Author: jark):
Another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51492&view=results

> Run Kubernetes test is unstable on AZP
> --
>
> Key: FLINK-32632
> URL: https://issues.apache.org/jira/browse/FLINK-32632
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
>
> This test 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51447&view=logs&j=bea52777-eaf8-5663-8482-18fbc3630e81&t=43ba8ce7-ebbf-57cd-9163-444305d74117&l=6213
> fails with
> {noformat}
> 2023-07-19T17:14:49.8144730Z Jul 19 17:14:49 
> deployment.apps/flink-task-manager created
> 2023-07-19T17:15:03.7983703Z Jul 19 17:15:03 job.batch/flink-job-cluster 
> condition met
> 2023-07-19T17:15:04.0937620Z error: Internal error occurred: error executing 
> command in container: http: invalid Host header
> 2023-07-19T17:15:04.0988752Z sort: cannot read: 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-11919909188/out/kubernetes_wc_out*':
>  No such file or directory
> 2023-07-19T17:15:04.1017388Z Jul 19 17:15:04 FAIL WordCount: Output hash 
> mismatch.  Got d41d8cd98f00b204e9800998ecf8427e, expected 
> e682ec6622b5e83f2eb614617d5ab2cf.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32632) Run Kubernetes test is unstable on AZP

2023-07-20 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17745130#comment-17745130
 ] 

Jark Wu commented on FLINK-32632:
-

Another instance: 
https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/51492/logs/110

> Run Kubernetes test is unstable on AZP
> --
>
> Key: FLINK-32632
> URL: https://issues.apache.org/jira/browse/FLINK-32632
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
>
> This test 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51447&view=logs&j=bea52777-eaf8-5663-8482-18fbc3630e81&t=43ba8ce7-ebbf-57cd-9163-444305d74117&l=6213
> fails with
> {noformat}
> 2023-07-19T17:14:49.8144730Z Jul 19 17:14:49 
> deployment.apps/flink-task-manager created
> 2023-07-19T17:15:03.7983703Z Jul 19 17:15:03 job.batch/flink-job-cluster 
> condition met
> 2023-07-19T17:15:04.0937620Z error: Internal error occurred: error executing 
> command in container: http: invalid Host header
> 2023-07-19T17:15:04.0988752Z sort: cannot read: 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-11919909188/out/kubernetes_wc_out*':
>  No such file or directory
> 2023-07-19T17:15:04.1017388Z Jul 19 17:15:04 FAIL WordCount: Output hash 
> mismatch.  Got d41d8cd98f00b204e9800998ecf8427e, expected 
> e682ec6622b5e83f2eb614617d5ab2cf.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32632) Run Kubernetes test is unstable on AZP

2023-07-20 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17745130#comment-17745130
 ] 

Jark Wu edited comment on FLINK-32632 at 7/20/23 3:25 PM:
--

Another instance: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51492&view=results


was (Author: jark):
Another instance: 
https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/51492/logs/110

> Run Kubernetes test is unstable on AZP
> --
>
> Key: FLINK-32632
> URL: https://issues.apache.org/jira/browse/FLINK-32632
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
>
> This test 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51447&view=logs&j=bea52777-eaf8-5663-8482-18fbc3630e81&t=43ba8ce7-ebbf-57cd-9163-444305d74117&l=6213
> fails with
> {noformat}
> 2023-07-19T17:14:49.8144730Z Jul 19 17:14:49 
> deployment.apps/flink-task-manager created
> 2023-07-19T17:15:03.7983703Z Jul 19 17:15:03 job.batch/flink-job-cluster 
> condition met
> 2023-07-19T17:15:04.0937620Z error: Internal error occurred: error executing 
> command in container: http: invalid Host header
> 2023-07-19T17:15:04.0988752Z sort: cannot read: 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-11919909188/out/kubernetes_wc_out*':
>  No such file or directory
> 2023-07-19T17:15:04.1017388Z Jul 19 17:15:04 FAIL WordCount: Output hash 
> mismatch.  Got d41d8cd98f00b204e9800998ecf8427e, expected 
> e682ec6622b5e83f2eb614617d5ab2cf.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30559) May get wrong result for `if` expression if it's string data type

2023-07-19 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-30559:

Fix Version/s: 1.18.0
   1.16.3
   1.17.2

> May get wrong result for `if` expression if it's string data type
> -
>
> Key: FLINK-30559
> URL: https://issues.apache.org/jira/browse/FLINK-30559
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> Can be reproduced by the folowing code in 
> `org.apache.flink.table.planner.runtime.batch.sql.CalcITCase`
>  
> {code:java}
> checkResult("SELECT if(b > 10, 'ua', c) from Table3", data3) {code}
> The actual result is [co, He, He, ...].
> Seems it will only get the first two characters.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30559) May get wrong result for `if` expression if it's string data type

2023-07-19 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu resolved FLINK-30559.
-
Resolution: Duplicate

> May get wrong result for `if` expression if it's string data type
> -
>
> Key: FLINK-30559
> URL: https://issues.apache.org/jira/browse/FLINK-30559
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
>
> Can be reproduced by the folowing code in 
> `org.apache.flink.table.planner.runtime.batch.sql.CalcITCase`
>  
> {code:java}
> checkResult("SELECT if(b > 10, 'ua', c) from Table3", data3) {code}
> The actual result is [co, He, He, ...].
> Seems it will only get the first two characters.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-30559) May get wrong result for `if` expression if it's string data type

2023-07-19 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reopened FLINK-30559:
-

> May get wrong result for `if` expression if it's string data type
> -
>
> Key: FLINK-30559
> URL: https://issues.apache.org/jira/browse/FLINK-30559
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
>
> Can be reproduced by the folowing code in 
> `org.apache.flink.table.planner.runtime.batch.sql.CalcITCase`
>  
> {code:java}
> checkResult("SELECT if(b > 10, 'ua', c) from Table3", data3) {code}
> The actual result is [co, He, He, ...].
> Seems it will only get the first two characters.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32277) Introduce operator fusion codegen basic framework

2023-07-12 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-32277.
---
Fix Version/s: 1.18.0
 Assignee: dalongliu
   Resolution: Fixed

Fixed in master: 02be55ab780186a4b738a331e17c436ad2543de3 and 
1f062a38f43cdc9fdbaab3c68e1ec1c1ffdb88b5

> Introduce operator fusion codegen basic framework
> -
>
> Key: FLINK-32277
> URL: https://issues.apache.org/jira/browse/FLINK-32277
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32278) HashJoin support operator fusion codegen

2023-07-12 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742454#comment-17742454
 ] 

Jark Wu edited comment on FLINK-32278 at 7/12/23 2:55 PM:
--

Fixed in master with 6040f480aae92f5449eb548a9eb3e042442b2bbb and 
f03c26a6f0189b78975828dc77482e5ba1e53641


was (Author: jark):
Fixed in master with 
02be55ab780186a4b738a331e17c436ad2543de3...f480b6d313f64ff5d8d1e6f1d74f2dacc3ea133b

> HashJoin support operator  fusion codegen
> -
>
> Key: FLINK-32278
> URL: https://issues.apache.org/jira/browse/FLINK-32278
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32278) HashJoin support operator fusion codegen

2023-07-12 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-32278.
---
Fix Version/s: 1.18.0
 Assignee: dalongliu
   Resolution: Fixed

Fixed in master with 
02be55ab780186a4b738a331e17c436ad2543de3...f480b6d313f64ff5d8d1e6f1d74f2dacc3ea133b

> HashJoin support operator  fusion codegen
> -
>
> Key: FLINK-32278
> URL: https://issues.apache.org/jira/browse/FLINK-32278
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32426) Fix adaptive local hash agg can't work when auxGrouping exist

2023-06-29 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-32426.
---
Fix Version/s: (was: 1.17.2)
 Assignee: dalongliu
   Resolution: Fixed

Fixed in master: 3f485162a372818c1402d78bf9fb25e06ca1cdf7

> Fix adaptive local hash agg can't work when auxGrouping exist
> -
>
> Key: FLINK-32426
> URL: https://issues.apache.org/jira/browse/FLINK-32426
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.18.0, 1.17.1
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> For the following case, the field `a` is primary key,  we select from 
> `AuxGroupingTable` and group by a, b. Since a is primary key, it also 
> guarantee the unique, so planner will extract b as auxGrouping field.
> {code:java}
> registerCollection(
>   "AuxGroupingTable",
>   data2,
>   type2,
>   "a, b, c, d, e",
>   nullablesOfData2,
>   FlinkStatistic.builder().uniqueKeys(Set(Set("a").asJava).asJava).build())
> checkResult(
>   "SELECT a, b, COUNT(c) FROM AuxGroupingTable GROUP BY a, b",
>   Seq(
> row(1, 1, 1),
> row(2, 3, 2),
> row(3, 4, 3),
> row(4, 10, 4),
> row(5, 11, 5)
>   )
> ) {code}
>  
> Due to the generated code doesn't get auxGrouping fields from input RowData 
> and then setting it to aggBuffer, the aggBuffer RowData loses some fields, 
> and it will throw an index Exception when get the field from it. As following:
> {code:java}
> Caused by: java.lang.AssertionError: index (1) should < 1
>     at 
> org.apache.flink.table.data.binary.BinaryRowData.assertIndexIsValid(BinaryRowData.java:127)
>     at 
> org.apache.flink.table.data.binary.BinaryRowData.isNullAt(BinaryRowData.java:156)
>     at 
> org.apache.flink.table.data.utils.JoinedRowData.isNullAt(JoinedRowData.java:113)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:201)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:165)
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:43)
>     at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:141)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>     at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:134)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:114)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:95)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:48)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31)
>     at LocalHashAggregateWithKeys$39.processElement_split2(Unknown Source)
>     at LocalHashAggregateWithKeys$39.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>     at BatchExecCalc$10.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>     at SourceConversion$6.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>     at 
> org.apache.flink.streaming.api.operators

[jira] [Commented] (FLINK-32426) Fix adaptive local hash agg can't work when auxGrouping exist

2023-06-29 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738543#comment-17738543
 ] 

Jark Wu commented on FLINK-32426:
-

[~lsy], do we need to fix it in 1.17.2 as well? If yes, could you help to 
create a PR for release-1.17 branch? 

> Fix adaptive local hash agg can't work when auxGrouping exist
> -
>
> Key: FLINK-32426
> URL: https://issues.apache.org/jira/browse/FLINK-32426
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.18.0, 1.17.1
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> For the following case, the field `a` is primary key,  we select from 
> `AuxGroupingTable` and group by a, b. Since a is primary key, it also 
> guarantee the unique, so planner will extract b as auxGrouping field.
> {code:java}
> registerCollection(
>   "AuxGroupingTable",
>   data2,
>   type2,
>   "a, b, c, d, e",
>   nullablesOfData2,
>   FlinkStatistic.builder().uniqueKeys(Set(Set("a").asJava).asJava).build())
> checkResult(
>   "SELECT a, b, COUNT(c) FROM AuxGroupingTable GROUP BY a, b",
>   Seq(
> row(1, 1, 1),
> row(2, 3, 2),
> row(3, 4, 3),
> row(4, 10, 4),
> row(5, 11, 5)
>   )
> ) {code}
>  
> Due to the generated code doesn't get auxGrouping fields from input RowData 
> and then setting it to aggBuffer, the aggBuffer RowData loses some fields, 
> and it will throw an index Exception when get the field from it. As following:
> {code:java}
> Caused by: java.lang.AssertionError: index (1) should < 1
>     at 
> org.apache.flink.table.data.binary.BinaryRowData.assertIndexIsValid(BinaryRowData.java:127)
>     at 
> org.apache.flink.table.data.binary.BinaryRowData.isNullAt(BinaryRowData.java:156)
>     at 
> org.apache.flink.table.data.utils.JoinedRowData.isNullAt(JoinedRowData.java:113)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:201)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:165)
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:43)
>     at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:141)
>     at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>     at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:134)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:114)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:95)
>     at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:48)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31)
>     at LocalHashAggregateWithKeys$39.processElement_split2(Unknown Source)
>     at LocalHashAggregateWithKeys$39.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>     at BatchExecCalc$10.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>     at SourceConversion$6.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77)
>     at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>     at 
> org.apache.flink.

[jira] [Commented] (FLINK-32444) Enable object reuse for Flink SQL jobs by default

2023-06-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737470#comment-17737470
 ] 

Jark Wu commented on FLINK-32444:
-

cc [~lincoln.86xy], [~lsy], [~twalthr] what do you think?

> Enable object reuse for Flink SQL jobs by default
> -
>
> Key: FLINK-32444
> URL: https://issues.apache.org/jira/browse/FLINK-32444
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.18.0
>
>
> Currently, object reuse is not enabled by default for Flink Streaming Jobs, 
> but is enabled by default for Flink Batch jobs. That is not consistent for 
> stream-batch unification. Besides, SQL operators are safe to enable object 
> reuse and this is a great performance improvement for SQL jobs. 
> We should also be careful with the Table-DataStream conversion case 
> (StreamTableEnvironment) which is not safe to enable object reuse by default. 
> Maybe we can just enable it for SQL Client/Gateway and TableEnvironment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32444) Enable object reuse for Flink SQL jobs by default

2023-06-26 Thread Jark Wu (Jira)
Jark Wu created FLINK-32444:
---

 Summary: Enable object reuse for Flink SQL jobs by default
 Key: FLINK-32444
 URL: https://issues.apache.org/jira/browse/FLINK-32444
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Jark Wu
 Fix For: 1.18.0


Currently, object reuse is not enabled by default for Flink Streaming Jobs, but 
is enabled by default for Flink Batch jobs. That is not consistent for 
stream-batch unification. Besides, SQL operators are safe to enable object 
reuse and this is a great performance improvement for SQL jobs. 

We should also be careful with the Table-DataStream conversion case 
(StreamTableEnvironment) which is not safe to enable object reuse by default. 
Maybe we can just enable it for SQL Client/Gateway and TableEnvironment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32263) Add ELT support in SQL & Table API

2023-06-25 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736961#comment-17736961
 ] 

Jark Wu edited comment on FLINK-32263 at 6/26/23 2:44 AM:
--

You can follow the way of 
[{{jsonArrayAgg}}|https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java#L915]
 which is defined as a static method. Users can call it in this way:

{code}
orders.select(ELT(lit(1), $("f0")));
{code}


was (Author: jark):
You can follow the way of 
[{{jsonArrayAgg}}|https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java#L915]
 which is defined as a static method. Users can calling it in this way:

{code}
orders.select(ELT(lit(1), $("f0")));
{code}

> Add ELT support in SQL & Table API
> --
>
> Key: FLINK-32263
> URL: https://issues.apache.org/jira/browse/FLINK-32263
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Implement the elt function to extract the n-th input value from a list of 
> inputs.
> Description:
> The elt function in the ETL pipeline extracts the value at the n-th position 
> from a list of input values. It is similar to array indexing, where the first 
> element is at position 1. This function provides a convenient way to retrieve 
> specific elements from a list of inputs.
> Syntax:
>  
> {code:java}
> elt[n: int, *inputs: str] -> str or None{code}
>  
> Arguments:
> n: The index of the input value to extract. It should be a positive integer.
> *inputs: Variable-length arguments representing the list of inputs.
> Returns:
> The value at the n-th position in the list of inputs. If the index exceeds 
> the length of the array, the function returns NULL. 
> Examples:
> Retrieving the second element from a list of strings:
> {code:java}
> elt(2, 'scala', 'java')
> Output: 'java'{code}
> Retrieving the second element from a list of mixed types:
> {code:java}
> result = elt(2, 'a', 1)
> Output: 1{code}
> See also:
>  
> spark:[https://spark.apache.org/docs/latest/api/sql/index.html#elt]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32263) Add ELT support in SQL & Table API

2023-06-25 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736961#comment-17736961
 ] 

Jark Wu commented on FLINK-32263:
-

You can follow the way of 
[{{jsonArrayAgg}}|https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java#L915]
 which is defined as a static method. Users can calling it in this way:

{code}
orders.select(ELT(lit(1), $("f0")));
{code}

> Add ELT support in SQL & Table API
> --
>
> Key: FLINK-32263
> URL: https://issues.apache.org/jira/browse/FLINK-32263
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Bonnie Varghese
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Implement the elt function to extract the n-th input value from a list of 
> inputs.
> Description:
> The elt function in the ETL pipeline extracts the value at the n-th position 
> from a list of input values. It is similar to array indexing, where the first 
> element is at position 1. This function provides a convenient way to retrieve 
> specific elements from a list of inputs.
> Syntax:
>  
> {code:java}
> elt[n: int, *inputs: str] -> str or None{code}
>  
> Arguments:
> n: The index of the input value to extract. It should be a positive integer.
> *inputs: Variable-length arguments representing the list of inputs.
> Returns:
> The value at the n-th position in the list of inputs. If the index exceeds 
> the length of the array, the function returns NULL. 
> Examples:
> Retrieving the second element from a list of strings:
> {code:java}
> elt(2, 'scala', 'java')
> Output: 'java'{code}
> Retrieving the second element from a list of mixed types:
> {code:java}
> result = elt(2, 'a', 1)
> Output: 1{code}
> See also:
>  
> spark:[https://spark.apache.org/docs/latest/api/sql/index.html#elt]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32426) Fix adaptive local hash can't work when auxGrouping exist

2023-06-25 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736954#comment-17736954
 ] 

Jark Wu commented on FLINK-32426:
-

Could you add more description about what error will be thrown in which case? 
That would be helpful for SEO for users searching similar problems. 

> Fix adaptive local hash can't work when auxGrouping exist
> -
>
> Key: FLINK-32426
> URL: https://issues.apache.org/jira/browse/FLINK-32426
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.18.0, 1.17.1
>Reporter: dalongliu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.2
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32434) missing s3 config by catalog

2023-06-25 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-32434.
---
Resolution: Invalid

This is an exception thrown by Paimon Catalog, please report this issue to 
paimon community.

> missing s3 config by catalog
> 
>
> Key: FLINK-32434
> URL: https://issues.apache.org/jira/browse/FLINK-32434
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Affects Versions: 1.17.0
>Reporter: gary
>Priority: Major
>  Labels: catalog, file-system
>
> After FLINK-30704, s3 config (s3 prefix) by catalog doesn't work. It works 
> fine in Flink 1.16.
> example: [https://paimon.apache.org/docs/master/filesystems/s3/]
>  
> {code:java}
> CREATE CATALOG my_catalog WITH (
> 'type' = 'paimon',
> 'warehouse' = 's3://path/to/warehouse',
> 's3.endpoint' = 'your-endpoint-hostname',
> 's3.access-key' = 'correct-user',
> 's3.secret-key' = 'yyy'
> ); {code}
> exception: 
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Unable to create 
> catalog 'xxx'
> Catalog options are:
> 's3.access-key'='correct-user'
> 's3.endpoint'= http://x.x.x.x:xx/'
> 's3.secret-key'='***
> 'type'='paimon'
> 'warehouse'='s3://xxx/'
> at 
> org.apache.flink.table.factories.Factoryutil.createcataalog(FactoryUtil.java:439)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.creat::Catalog(TableEnvironmentimpl.java:1466)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInter:nal(TableEnvironmentImpl.java:1212)
> at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:541)
> at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440)
> at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
> at 
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
> at 
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitoperation$1(OperationManager.java:119)
> at 
> org.apache.flink.table.gateway.service.operation.OperattionManager$Operation.lambda$run$0(OperationManager.java:258)
> ... 7 more
> Caused by: java.lang.RuntimeException: java.nio.file.AccessDeniedException: 
> s3://xxx/default.db: getFileStatus on s3://xxx/default.db: 
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon 
> S3; Status Code: 403; Error Code: 403 Forbidden; Req
> uest ID: 176A9F853B408F0E; S3 Extended Request ID: 
> e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855; Proxy: 
> nul1), S3 Extended Request ID: 
> e3b8c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855:403 Forbidden
> at org.apache.paimon.catalog.FileSystemCatalog.uncheck(FilLeSystemCatalog. 
> java:207)
> at ofg.apache:paimon.catatog.FitesystemCatalog.createpatabase(FitesystemCa
> at org.apache.paimon.flink.FlinkCatalog.(FlinkCatalog.java:112)
> at 
> org.apache.paimon.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:70)
> at 
> org.apache.paimon.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.
>  java:58)
> at 
> org.apache.paimon.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.
>  java:32)
> at org.apache.flink.table.factories.Factoryutil.createcataalog(FactoryUtil. 
> java:436)
> ... 15 more {code}
> Flink access s3 by user defined in flink-conf.yaml, not 'correct-user' in 
> catalog, causes Forbidden.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28620) SQL Client doesn't properly print values of INTERVAL type

2023-06-25 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-28620.
---
Fix Version/s: (was: 1.18.0)
 Assignee: (was: Jark Wu)
   Resolution: Cannot Reproduce

> SQL Client doesn't properly print values of INTERVAL type
> -
>
> Key: FLINK-28620
> URL: https://issues.apache.org/jira/browse/FLINK-28620
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Reporter: Jark Wu
>Priority: Major
>
> The display of values of interval type should follow the CAST rules. However, 
> currently, SQL Client prints it using {{Period.toString()}} and 
> {{Duration.toString()}} which is not SQL standard compliant. 
> {code}
> Flink SQL> select interval '9-11' year to month;
> ++
> | EXPR$0 |
> ++
> |  P119M |
> ++
> 1 row in set
> Flink SQL> select cast(interval '9-11' year to month as varchar);
> ++
> | EXPR$0 |
> ++
> |  +9-11 |
> ++
> 1 row in set
> Flink SQL> select interval '2 1:2:3' day to second;
> +---+
> |EXPR$0 |
> +---+
> | PT49H2M3S |
> +---+
> 1 row in set
> Flink SQL> select cast(interval '2 1:2:3' day to second as varchar);
> +-+
> |  EXPR$0 |
> +-+
> | +2 01:02:03.000 |
> +-+
> 1 row in set
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28620) SQL Client doesn't properly print values of INTERVAL type

2023-06-25 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736834#comment-17736834
 ] 

Jark Wu commented on FLINK-28620:
-

Thank you for the validation. I will close it then. 

> SQL Client doesn't properly print values of INTERVAL type
> -
>
> Key: FLINK-28620
> URL: https://issues.apache.org/jira/browse/FLINK-28620
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
> Fix For: 1.18.0
>
>
> The display of values of interval type should follow the CAST rules. However, 
> currently, SQL Client prints it using {{Period.toString()}} and 
> {{Duration.toString()}} which is not SQL standard compliant. 
> {code}
> Flink SQL> select interval '9-11' year to month;
> ++
> | EXPR$0 |
> ++
> |  P119M |
> ++
> 1 row in set
> Flink SQL> select cast(interval '9-11' year to month as varchar);
> ++
> | EXPR$0 |
> ++
> |  +9-11 |
> ++
> 1 row in set
> Flink SQL> select interval '2 1:2:3' day to second;
> +---+
> |EXPR$0 |
> +---+
> | PT49H2M3S |
> +---+
> 1 row in set
> Flink SQL> select cast(interval '2 1:2:3' day to second as varchar);
> +-+
> |  EXPR$0 |
> +-+
> | +2 01:02:03.000 |
> +-+
> 1 row in set
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28620) SQL Client doesn't properly print values of INTERVAL type

2023-06-25 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736834#comment-17736834
 ] 

Jark Wu edited comment on FLINK-28620 at 6/25/23 7:52 AM:
--

[~Sergey Nuyanzin] Thank you for the validation. I will close it then. 


was (Author: jark):
Thank you for the validation. I will close it then. 

> SQL Client doesn't properly print values of INTERVAL type
> -
>
> Key: FLINK-28620
> URL: https://issues.apache.org/jira/browse/FLINK-28620
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
> Fix For: 1.18.0
>
>
> The display of values of interval type should follow the CAST rules. However, 
> currently, SQL Client prints it using {{Period.toString()}} and 
> {{Duration.toString()}} which is not SQL standard compliant. 
> {code}
> Flink SQL> select interval '9-11' year to month;
> ++
> | EXPR$0 |
> ++
> |  P119M |
> ++
> 1 row in set
> Flink SQL> select cast(interval '9-11' year to month as varchar);
> ++
> | EXPR$0 |
> ++
> |  +9-11 |
> ++
> 1 row in set
> Flink SQL> select interval '2 1:2:3' day to second;
> +---+
> |EXPR$0 |
> +---+
> | PT49H2M3S |
> +---+
> 1 row in set
> Flink SQL> select cast(interval '2 1:2:3' day to second as varchar);
> +-+
> |  EXPR$0 |
> +-+
> | +2 01:02:03.000 |
> +-+
> 1 row in set
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32332) Jar files for catalog function are not listed correctly

2023-06-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733390#comment-17733390
 ] 

Jark Wu edited comment on FLINK-32332 at 6/16/23 8:11 AM:
--

[~zjureel], currently, the SHOW JARS are used to display the added jars by ADD 
JAR statements. So it is expected not list jar files in catalog functions.  
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/jar/#show-jars


was (Author: jark):
[~zjureel], currently, the SHOW JARS are used to display the added jars by ADD 
JAR statements. So it is expected not list jar files in catalog functions. 

> Jar files for catalog function are not listed correctly
> ---
>
> Key: FLINK-32332
> URL: https://issues.apache.org/jira/browse/FLINK-32332
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>
> `SHOW JARS` statement will list all jar files in the catalog, but the jar 
> files for catalog function will not be listed before it is used in the 
> specific session of gateway



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32332) Jar files for catalog function are not listed correctly

2023-06-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733390#comment-17733390
 ] 

Jark Wu commented on FLINK-32332:
-

[~zjureel], currently, the SHOW JARS are used to display the added jars by ADD 
JAR statements. So it is expected not list jar files in catalog functions. 

> Jar files for catalog function are not listed correctly
> ---
>
> Key: FLINK-32332
> URL: https://issues.apache.org/jira/browse/FLINK-32332
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>
> `SHOW JARS` statement will list all jar files in the catalog, but the jar 
> files for catalog function will not be listed before it is used in the 
> specific session of gateway



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19830) Properly implements processing-time temporal table join

2023-06-15 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17732976#comment-17732976
 ] 

Jark Wu commented on FLINK-19830:
-

Hi [~rmetzger], sorry for the late reply. Yes, your understanding of the 
Processing Time Temporal Join is correct. But this is not the same limitation 
with Event Time Temporal Join, because the probe side will wait for the build 
side based on the watermark, so there won't be wrong unmatched records emitted 
for event time join. 

Currently, we have some plans to support some kind of custom event mechanism to 
support this feature in the near time. Hope I can be back with more information 
soon! 

> Properly implements processing-time temporal table join
> ---
>
> Key: FLINK-19830
> URL: https://issues.apache.org/jira/browse/FLINK-19830
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
>
> The exsiting TemporalProcessTimeJoinOperator has already supported temporal 
> table join.
>  However, the semantic of this implementation is problematic, because the 
> join processing for left stream doesn't wait for the complete snapshot of 
> temporal table, this may mislead users in production environment.
> Under the processing time temporal join semantics, to get the complete 
> snapshot of temporal table may need introduce new mechanism in FLINK SQL in 
> the future.
> **Background** : 
>  * The reason why we turn off the switch[1] for `FOR SYSTEM_TIME AS OF` 
> syntax for *temporal table join* is only the semantic consideration as above.
>  * The reason why we turn on *temporal table function*  is that it has been 
> alive for a long time, thus although it exists same semantic problem, but we 
> still support it from the perspective of compatibility.
> [1] 
> [https://github.com/apache/flink/blob/4fe9f525a92319acc1e3434bebed601306f7a16f/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java#L257]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31835) DataTypeHint don't support Row>

2023-06-12 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-31835.
---
Fix Version/s: 1.18.0
 Assignee: Aitozi
   Resolution: Fixed

Fixed in master: a6adbdda0cdf90635f0cd7a3427486bced301fbd

> DataTypeHint don't support Row>
> 
>
> Key: FLINK-31835
> URL: https://issues.apache.org/jira/browse/FLINK-31835
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.4
>Reporter: jeff-zou
>Assignee: Aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Using DataTypeHint("Row>") in a UDF gives the following error:
>  
> {code:java}
> Caused by: java.lang.ClassCastException: class [I cannot be cast to class 
> [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of 
> loader 'bootstrap')
> org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
> org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
> StreamExecCalc$251.processElement_split9(Unknown Source)
> StreamExecCalc$251.processElement(Unknown Source)
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  {code}
>  
> The function is as follows:
> {code:java}
> @DataTypeHint("Row>")
> public Row eval() {
> int[] i = new int[3];
> return Row.of(i);
> } {code}
>  
> This error is not reported when testing other simple types, so it is not an 
> environmental problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32204) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with The ExecutorService is shut down already. No Callables can be executed on AZP

2023-06-12 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17731531#comment-17731531
 ] 

Jark Wu commented on FLINK-32204:
-

ZooKeeperLeaderElectionTest.testZooKeeperReelection
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49893&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8

> ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement fails with 
> The ExecutorService is shut down already. No Callables can be executed on AZP
> ---
>
> Key: FLINK-32204
> URL: https://issues.apache.org/jira/browse/FLINK-32204
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: pull-request-available, test-stability
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49386&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=7095]
>  fails as
> {noformat}
> May 25 18:45:50 Caused by: java.util.concurrent.RejectedExecutionException: 
> The ExecutorService is shut down already. No Callables can be executed.
> May 25 18:45:50   at 
> org.apache.flink.util.concurrent.DirectExecutorService.throwRejectedExecutionExceptionIfShutdown(DirectExecutorService.java:237)
> May 25 18:45:50   at 
> org.apache.flink.util.concurrent.DirectExecutorService.submit(DirectExecutorService.java:100)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:902)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.publishEvent(TreeCache.java:894)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache.access$1200(TreeCache.java:79)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.TreeCache$TreeNode.processResult(TreeCache.java:489)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152)
> May 25 18:45:50   at 
> org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetDataBuilderImpl$3.processResult(GetDataBuilderImpl.java:272)
> May 25 18:45:50   at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:634)
> May 25 18:45:50   at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553)
> May 25 18:45:50
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32276) After adding the where condition to the flink lookup left join, the joinType becomes innerJoin

2023-06-07 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730339#comment-17730339
 ] 

Jark Wu commented on FLINK-32276:
-

[~yesorno] it works as expected. Adding a condition for {{dim.age > 10}} means 
{{dim.age}} shouldn't be NULL. "LEFT JOIN + right record is not null" can be 
optimized into INNER JOIN, they are equal.

> After adding the where condition to the flink lookup left join, the joinType 
> becomes innerJoin
> --
>
> Key: FLINK-32276
> URL: https://issues.apache.org/jira/browse/FLINK-32276
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.17.1
>Reporter: Xianxun Ye
>Priority: Major
> Attachments: lookup_join_inner_join_type.jpg
>
>
> *How to reproduce:*
> {code:java}
> CREATE TABLE dim (
>   id BIGINT,
>   name STRING,
>   age INT,
>   status BOOLEAN,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>   'table-name' = 'users'
> );
> create table actions (
>   id bigint,
>   proc as proctime(),
>   primary key (id) not enforced
> ) with (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>   'table-name' = 'actions'
> );
> select
>   *
> from
>   actions
>   left join dim for system_time as of actions.proc on actions.id = dim.id
> where
>   dim.age > 10; {code}
> When running the above SQL, the LookupJoin operator is executed based on 
> InnerJoin, contrary to the SQL's left join.
> If I remove the where condition(dim.age>10), the LookupJoin's joinType is 
> LeftOuterJoin.
> Is this a bug?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return

2023-06-05 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729559#comment-17729559
 ] 

Jark Wu commented on FLINK-32252:
-

[~tanjialiang] That works as expected. Currently, streaming mode assumes the 
source is unbounded, so it should never return 0. But I agree the semantics are 
not consistent between batch and streaming. We have FLINK-12215 and FLINK-18365 
to track and improve this. 

> SELECT COUNT(*) will return nothing when source no data return
> --
>
> Key: FLINK-32252
> URL: https://issues.apache.org/jira/browse/FLINK-32252
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: jdbc-3.1.0
>Reporter: tanjialiang
>Priority: Major
>
>  
> mysql source
> {code:java}
> CREATE TABLE student(
> id int primary key auto_increment,
> name varchar(32),
> age int
> );
> INSERT INTO student(name, age) VALUES 
> ('tanjl',18),('jark',20),('mike',16),('rose',21);{code}
>  
> Flink SQL
> {code:java}
> CREATE TABLE student (
> `id` INT PRIMARY KEY,
> `name` STRING,
> `age` INT
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC',
>   'username' = 'root',
>   'password' = 'root',
>   'table-name' = 'student'
> ); 
> SELECT count(*) FROM student WHERE age < 15;{code}
> flink will return nothing because jdbc connector push the filter down(after 
> flink-connector-jdbc-3.1.0), which make source no data return.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return

2023-06-05 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729559#comment-17729559
 ] 

Jark Wu edited comment on FLINK-32252 at 6/6/23 2:43 AM:
-

[~tanjialiang] That works as expected (by design). Currently, streaming mode 
assumes the source is unbounded, so it should never return 0. But I agree the 
semantics are not consistent between batch and streaming. We have FLINK-12215 
and FLINK-18365 to track and improve this. 


was (Author: jark):
[~tanjialiang] That works as expected. Currently, streaming mode assumes the 
source is unbounded, so it should never return 0. But I agree the semantics are 
not consistent between batch and streaming. We have FLINK-12215 and FLINK-18365 
to track and improve this. 

> SELECT COUNT(*) will return nothing when source no data return
> --
>
> Key: FLINK-32252
> URL: https://issues.apache.org/jira/browse/FLINK-32252
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: jdbc-3.1.0
>Reporter: tanjialiang
>Priority: Major
>
>  
> mysql source
> {code:java}
> CREATE TABLE student(
> id int primary key auto_increment,
> name varchar(32),
> age int
> );
> INSERT INTO student(name, age) VALUES 
> ('tanjl',18),('jark',20),('mike',16),('rose',21);{code}
>  
> Flink SQL
> {code:java}
> CREATE TABLE student (
> `id` INT PRIMARY KEY,
> `name` STRING,
> `age` INT
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC',
>   'username' = 'root',
>   'password' = 'root',
>   'table-name' = 'student'
> ); 
> SELECT count(*) FROM student WHERE age < 15;{code}
> flink will return nothing because jdbc connector push the filter down(after 
> flink-connector-jdbc-3.1.0), which make source no data return.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32220) Improving the adaptive local hash agg code to avoid get value from RowData repeatedly

2023-06-05 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-32220.
---
  Assignee: dalongliu
Resolution: Fixed

Fixed in master: 3046875c7aa0501f9a67f280034a74ea107315e3

> Improving the adaptive local hash agg code to avoid get value from RowData 
> repeatedly
> -
>
> Key: FLINK-32220
> URL: https://issues.apache.org/jira/browse/FLINK-32220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32251) flink jdbc catalog cannot add parameters to url

2023-06-05 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-32251:

Component/s: Connectors / JDBC

> flink jdbc catalog cannot add parameters to url
> ---
>
> Key: FLINK-32251
> URL: https://issues.apache.org/jira/browse/FLINK-32251
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC
>Reporter: melin
>Priority: Major
>
> For example, add an encoding parameter to the mysql url:
> jdbc:mysql://host:port/database?useUnicode=true&characterEncoding=UTF-8



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32252) SELECT COUNT(*) will return nothing when source no data return

2023-06-05 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729302#comment-17729302
 ] 

Jark Wu commented on FLINK-32252:
-

Were you running in batch mode or streaming mode?

> SELECT COUNT(*) will return nothing when source no data return
> --
>
> Key: FLINK-32252
> URL: https://issues.apache.org/jira/browse/FLINK-32252
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: tanjialiang
>Priority: Major
>
>  
> mysql source
> {code:java}
> CREATE TABLE student(
> id int primary key auto_increment,
> name varchar(32),
> age int
> );
> INSERT INTO student(name, age) VALUES 
> ('tanjl',18),('jark',20),('mike',16),('rose',21);{code}
>  
> Flink SQL
> {code:java}
> CREATE TABLE student (
> `id` INT PRIMARY KEY,
> `name` STRING,
> `age` INT
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost/test?serverTimezone=UTC',
>   'username' = 'root',
>   'password' = 'root',
>   'table-name' = 'student'
> ); 
> SELECT count(*) FROM student WHERE age < 15;{code}
> flink will return nothing because jdbc connector push the filter down(after 
> flink-connector-jdbc-3.1.0), which make source no data return.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32219) SQL client hangs when executing EXECUTE PLAN

2023-06-01 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-32219:

Component/s: Table SQL / Client

> SQL client hangs when executing EXECUTE PLAN
> 
>
> Key: FLINK-32219
> URL: https://issues.apache.org/jira/browse/FLINK-32219
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.1
>Reporter: Shuai Xu
>Priority: Major
>
> I compiled a plan for an INSERT statement and executed the plan, but the SQL 
> client became unresponsive when executing the EXECUTE PLAN statement. I 
> confirmed that the Flink job is running normally by checking the Flink 
> dashboard. The only issue is that the SQL client becomes stuck and cannot 
> accept new commands. I printed the stack trace of the SQL client process, and 
> here is a part of it for reference.
> {code:java}
> "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 
> waiting on condition [0x000173e01000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x00076e72af20> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
>   at 
> org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37)
>   at 
> org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106)
>   at java.util.Iterator.forEachRemaining(Iterator.java:115)
>   at 
> org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115)
>   at 
> org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
>   at 
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
>   at 
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown
>  Source)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown
>  Source)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown
>  Source)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?

2023-05-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726598#comment-17726598
 ] 

Jark Wu commented on FLINK-32188:
-

But I'm also confused why the array constructor is not evaluated into literal 
before pushing down. 

> Does the custom connector not support pushing down "where" query predicates 
> to query fields of array type?
> --
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, 
> screenshot-5.png, screenshot-6.png, screenshot-7.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> found that when creating a table with ddl, I specified a field URL as an 
> array type. When submitting an SQL task with Flink, I specified query this 
> field as a fixed array. For example, select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it couldn't obtain the 
> corresponding predicate filters at all.
> Does the custom connector not support  to query fields of array type with 
> "where"?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32188) Does the custom connector not support pushing down "where" query predicates to query fields of array type?

2023-05-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726597#comment-17726597
 ] 

Jark Wu commented on FLINK-32188:
-

Yes, I think your fix is fine. The function name "array" in 
BuiltInFunctionDefinition is not equal to "ARRAY_VALUE_CONSTRUCTOR", I think 
that's why we can't find the function via {{lookupFunction}}.

> Does the custom connector not support pushing down "where" query predicates 
> to query fields of array type?
> --
>
> Key: FLINK-32188
> URL: https://issues.apache.org/jira/browse/FLINK-32188
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2, 1.17.0, 1.16.1
>Reporter: Xin Chen
>Priority: Major
> Attachments: image-2023-05-25-17-16-02-288.png, 
> image-2023-05-25-20-44-08-834.png, image-2023-05-25-20-44-47-581.png, 
> screenshot-1.png, screenshot-2.png, screenshot-3.png, screenshot-4.png, 
> screenshot-5.png, screenshot-6.png, screenshot-7.png
>
>
> When I customized a data source connector which assumed as image-connector, I 
> found that when creating a table with ddl, I specified a field URL as an 
> array type. When submitting an SQL task with Flink, I specified query this 
> field as a fixed array. For example, select * from image source where 
> URL=ARRAY ['/flink. jpg', '/flink_1. jpg'], but it couldn't obtain the 
> corresponding predicate filters at all.
> Does the custom connector not support  to query fields of array type with 
> "where"?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32043) SqlClient session unrecoverable once one wrong setting occurred

2023-05-23 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-32043:

Priority: Critical  (was: Major)

> SqlClient session unrecoverable once one wrong setting occurred
> ---
>
> Key: FLINK-32043
> URL: https://issues.apache.org/jira/browse/FLINK-32043
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0
>Reporter: lincoln lee
>Priority: Critical
>
> In sql client, it can not work normally once one wrong setting occurred
> {code:java}
> // wrong setting here
> Flink SQL> SET table.sql-dialect = flink;
> [INFO] Execute statement succeed.
> Flink SQL> select '' AS f1, a from t1;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.table.api.SqlDialect.FLINK
> Flink SQL> SET table.sql-dialect = default;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.table.api.SqlDialect.FLINK
> Flink SQL> RESET table.sql-dialect;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.table.api.SqlDialect.FLINK
> Flink SQL> RESET;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.table.api.SqlDialect.FLINK 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31822) Support configure maxRows when fetch result

2023-05-21 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17724692#comment-17724692
 ] 

Jark Wu commented on FLINK-31822:
-

Sounds good to me. What do you think [~fsk119]

> Support configure maxRows when fetch result 
> 
>
> Key: FLINK-31822
> URL: https://issues.apache.org/jira/browse/FLINK-31822
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.1
>Reporter: Feng Jin
>Priority: Major
>
> The default value of maxRow during fetch result is 5000. When requested from 
> a web page, too many results in a single request may cause the web page to 
> freeze.
>  
> Therefore, we can support configuring the maximum number of request results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >