[GitHub] [flink] zhoulii commented on pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits

2022-06-13 Thread GitBox


zhoulii commented on PR #19946:
URL: https://github.com/apache/flink/pull/19946#issuecomment-1154791356

   > > Maybe add a hotfix commit "Rework BinaryInputFormatTest to be based on 
AssertJ" first. And then add the new test using AssertJ.
   > 
   > same as @zhuzhurk's previous suggestion, you should put the hotfix in the 
first commit, and then commit your changes and new tests follow it.
   
   Sorry, I don't quite understand this suggestion. Do you mean I should start 
a new PR to Rework BinaryInputFormatTest first?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28028) Common secure credential protection mechanism in Flink SQL

2022-06-13 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553931#comment-17553931
 ] 

Martijn Visser commented on FLINK-28028:


A potential solution was brought forward on the mailing list a while ago, see 
https://lists.apache.org/thread/ljn994s6xlzhz09ssxmynzotbw1mvt7f

> Common secure credential protection mechanism in Flink SQL
> --
>
> Key: FLINK-28028
> URL: https://issues.apache.org/jira/browse/FLINK-28028
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Table SQL / Runtime
>Reporter: Jing Ge
>Priority: Major
>
> Currently, the most common way to use credential is to use:
>  
> CREATE TABLE mytable (
>   ...
> )
> WITH (
>   'connector' = 'kafka',
>   'properties.bootstrap.servers' = '...:9092',
>   'topic' = '...',
>   'properties.ssl.keystore.password' = ,
>   'properties.ssl.keystore.location' = ...,
>   'properties.ssl.truststore.password' = ,
>   'properties.ssl.truststore.location' = ...,
> );
> The credential could then be read by calling SHOW CREATE TABLE . 
> We should provide a more strong way to protect the credential. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel

2022-06-13 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-28033:
---
Description: 
File: StatusWatermarkValve.java

Method:  findAndOutputNewMinWatermarkAcrossAlignedChannels
{code:java}
//代码占位符
long newMinWatermark = Long.MAX_VALUE;
boolean hasAlignedChannels = false;

// determine new overall watermark by considering only watermark-aligned 
channels across all
// channels
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned) {
hasAlignedChannels = true;
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}

// we acknowledge and output the new overall watermark if it really is 
aggregated
// from some remaining aligned channel, and is also larger than the last output 
watermark
if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
output.emitWatermark(new Watermark(lastOutputWatermark));
} {code}
 channelStatus's initalized watermark is Long.MIN_VALUE. when one 
channelStatus's watermark is changed,but other channelStatus's is not changed, 
the newMinWatermark is always Long.MIN_VALUE and output not emitwatermark。 

  was:
File: StatusWatermarkValue.java

Method:  findAndOutputNewMinWatermarkAcrossAlignedChannels
{code:java}
//代码占位符
long newMinWatermark = Long.MAX_VALUE;
boolean hasAlignedChannels = false;

// determine new overall watermark by considering only watermark-aligned 
channels across all
// channels
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned) {
hasAlignedChannels = true;
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}

// we acknowledge and output the new overall watermark if it really is 
aggregated
// from some remaining aligned channel, and is also larger than the last output 
watermark
if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
output.emitWatermark(new Watermark(lastOutputWatermark));
} {code}
 channelStatus's initalized watermark is Long.MIN_VALUE. when one 
channelStatus's watermark is changed,but other channelStatus's is not changed, 
the newMinWatermark is always Long.MIN_VALUE and output not emitwatermark。 


> find and output new min watermark mybe wrong when in multichannel
> -
>
> Key: FLINK-28033
> URL: https://issues.apache.org/jira/browse/FLINK-28033
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Reporter: YeAble
>Priority: Major
>
> File: StatusWatermarkValve.java
> Method:  findAndOutputNewMinWatermarkAcrossAlignedChannels
> {code:java}
> //代码占位符
> long newMinWatermark = Long.MAX_VALUE;
> boolean hasAlignedChannels = false;
> // determine new overall watermark by considering only watermark-aligned 
> channels across all
> // channels
> for (InputChannelStatus channelStatus : channelStatuses) {
> if (channelStatus.isWatermarkAligned) {
> hasAlignedChannels = true;
> newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
> }
> }
> // we acknowledge and output the new overall watermark if it really is 
> aggregated
> // from some remaining aligned channel, and is also larger than the last 
> output watermark
> if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
> lastOutputWatermark = newMinWatermark;
> output.emitWatermark(new Watermark(lastOutputWatermark));
> } {code}
>  channelStatus's initalized watermark is Long.MIN_VALUE. when one 
> channelStatus's watermark is changed,but other channelStatus's is not 
> changed, the newMinWatermark is always Long.MIN_VALUE and output not 
> emitwatermark。 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] shuiqiangchen commented on pull request #19140: [FLINK-25231][python]Update Pyflink to use the new type system

2022-06-13 Thread GitBox


shuiqiangchen commented on PR #19140:
URL: https://github.com/apache/flink/pull/19140#issuecomment-1154786882

   @HuangXingBo Thank you for your comments. I have updated the PR according to 
your suggestions, please have look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel

2022-06-13 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-28033:
---
Priority: Major  (was: Blocker)

> find and output new min watermark mybe wrong when in multichannel
> -
>
> Key: FLINK-28033
> URL: https://issues.apache.org/jira/browse/FLINK-28033
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Reporter: YeAble
>Priority: Major
>
> File: StatusWatermarkValue.java
> Method:  findAndOutputNewMinWatermarkAcrossAlignedChannels
> {code:java}
> //代码占位符
> long newMinWatermark = Long.MAX_VALUE;
> boolean hasAlignedChannels = false;
> // determine new overall watermark by considering only watermark-aligned 
> channels across all
> // channels
> for (InputChannelStatus channelStatus : channelStatuses) {
> if (channelStatus.isWatermarkAligned) {
> hasAlignedChannels = true;
> newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
> }
> }
> // we acknowledge and output the new overall watermark if it really is 
> aggregated
> // from some remaining aligned channel, and is also larger than the last 
> output watermark
> if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
> lastOutputWatermark = newMinWatermark;
> output.emitWatermark(new Watermark(lastOutputWatermark));
> } {code}
>  channelStatus's initalized watermark is Long.MIN_VALUE. when one 
> channelStatus's watermark is changed,but other channelStatus's is not 
> changed, the newMinWatermark is always Long.MIN_VALUE and output not 
> emitwatermark。 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] reswqa commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits

2022-06-13 Thread GitBox


reswqa commented on code in PR #19946:
URL: https://github.com/apache/flink/pull/19946#discussion_r896438515


##
flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java:
##
@@ -73,13 +76,21 @@ public void testCreateInputSplitsWithOneFile() throws 
IOException {
 
 FileInputSplit[] inputSplits = 
inputFormat.createInputSplits(numBlocks);
 
-Assert.assertEquals("Returns requested numbers of splits.", numBlocks, 
inputSplits.length);
-Assert.assertEquals(
-"1. split has block size length.", blockSize, 
inputSplits[0].getLength());
-Assert.assertEquals(
-"2. split has block size length.", blockSize, 
inputSplits[1].getLength());
-Assert.assertEquals(
-"3. split has block size length.", blockSize, 
inputSplits[2].getLength());
+assertThat(inputSplits.length)

Review Comment:
   The same problem of other tests also needs to be modified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits

2022-06-13 Thread GitBox


reswqa commented on code in PR #19946:
URL: https://github.com/apache/flink/pull/19946#discussion_r896430817


##
flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java:
##
@@ -154,17 +170,60 @@ public void testGetStatisticsMultiplePaths() throws 
IOException {
 inputFormat.setBlockSize(blockSize);
 
 BaseStatistics stats = inputFormat.getStatistics(null);
-Assert.assertEquals(
-"The file size statistics is wrong",
-blockSize * (numBlocks1 + numBlocks2),
-stats.getTotalInputSize());
+
+assertThat(stats.getTotalInputSize())
+.as("The file size statistics is wrong")
+.isEqualTo(blockSize * (numBlocks1 + numBlocks2));
+}
+
+@Test
+public void testCreateInputSplitsWithEmptySplit() throws IOException {
+final int blockInfoSize = new BlockInfo().getInfoSize();
+final int blockSize = blockInfoSize + 8;
+final int numBlocks = 3;
+final int minNumSplits = 5;
+
+// create temporary file with 3 blocks
+final File tempFile =
+createBinaryInputFile(
+"test_create_input_splits_with_empty_split", 
blockSize, numBlocks);
+
+final Configuration config = new Configuration();
+config.setLong("input.block_size", blockSize + 10);
+
+final BinaryInputFormat inputFormat = new 
MyBinaryInputFormat();
+inputFormat.setFilePath(tempFile.toURI().toString());
+inputFormat.setBlockSize(blockSize);
+
+inputFormat.configure(config);
+
+FileInputSplit[] inputSplits = 
inputFormat.createInputSplits(minNumSplits);
+
+assertThat(inputSplits.length)
+.as("Returns requested numbers of splits.")
+.isEqualTo(minNumSplits);
+
+assertThat(inputSplits[0].getLength())
+.as("1. split has block size length.")

Review Comment:
   IMO, `as` is used to add description when this assert is not pass.
   The error message will be displayed in the following format:
   org.opentest4j.AssertionFailedError: [description in as()] 
   So I think the `as` description in these tests should be more accurate, such 
as
   "1. split should/must has block size length."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits

2022-06-13 Thread GitBox


reswqa commented on code in PR #19946:
URL: https://github.com/apache/flink/pull/19946#discussion_r896430817


##
flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java:
##
@@ -154,17 +170,60 @@ public void testGetStatisticsMultiplePaths() throws 
IOException {
 inputFormat.setBlockSize(blockSize);
 
 BaseStatistics stats = inputFormat.getStatistics(null);
-Assert.assertEquals(
-"The file size statistics is wrong",
-blockSize * (numBlocks1 + numBlocks2),
-stats.getTotalInputSize());
+
+assertThat(stats.getTotalInputSize())
+.as("The file size statistics is wrong")
+.isEqualTo(blockSize * (numBlocks1 + numBlocks2));
+}
+
+@Test
+public void testCreateInputSplitsWithEmptySplit() throws IOException {
+final int blockInfoSize = new BlockInfo().getInfoSize();
+final int blockSize = blockInfoSize + 8;
+final int numBlocks = 3;
+final int minNumSplits = 5;
+
+// create temporary file with 3 blocks
+final File tempFile =
+createBinaryInputFile(
+"test_create_input_splits_with_empty_split", 
blockSize, numBlocks);
+
+final Configuration config = new Configuration();
+config.setLong("input.block_size", blockSize + 10);
+
+final BinaryInputFormat inputFormat = new 
MyBinaryInputFormat();
+inputFormat.setFilePath(tempFile.toURI().toString());
+inputFormat.setBlockSize(blockSize);
+
+inputFormat.configure(config);
+
+FileInputSplit[] inputSplits = 
inputFormat.createInputSplits(minNumSplits);
+
+assertThat(inputSplits.length)
+.as("Returns requested numbers of splits.")
+.isEqualTo(minNumSplits);
+
+assertThat(inputSplits[0].getLength())
+.as("1. split has block size length.")

Review Comment:
   IMO, `as` is used to add description when this assert is not pass.
   The error message will be displayed in the following format:
   `org.opentest4j.AssertionFailedError: [description in as()] `
   So I think the `as` description in these tests should be more accurate, such 
as
   "1. split should/must has block size length."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28026) Add benchmark module for flink table store

2022-06-13 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553929#comment-17553929
 ] 

Jingsong Lee commented on FLINK-28026:
--

[~Aiden Gong] Thanks!

> Add benchmark module for flink table store
> --
>
> Key: FLINK-28026
> URL: https://issues.apache.org/jira/browse/FLINK-28026
> Project: Flink
>  Issue Type: New Feature
>  Components: Table Store
>Reporter: Aiden Gong
>Assignee: Aiden Gong
>Priority: Minor
> Fix For: table-store-0.2.0
>
>
> Add benchmark module for flink table store.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-28026) Add benchmark module for flink table store

2022-06-13 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-28026:


Assignee: Aiden Gong

> Add benchmark module for flink table store
> --
>
> Key: FLINK-28026
> URL: https://issues.apache.org/jira/browse/FLINK-28026
> Project: Flink
>  Issue Type: New Feature
>  Components: Table Store
>Reporter: Aiden Gong
>Assignee: Aiden Gong
>Priority: Minor
> Fix For: table-store-0.2.0
>
>
> Add benchmark module for flink table store.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-28032) Checkpointing hangs and times out with some jobs

2022-06-13 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-28032.

Resolution: Information Provided

The thread dump on taskmanager does not have a good format for people to 
understand.
>From my knowledge, the checkpoint lock was not guarantee by the task thread 
>which leads to the job hangs.
Flink's jira issues is not a place to ask user questions, and slack or user 
mailing list is a better place to ask, you can find the information in the 
community info: https://flink.apache.org/community.html

> Checkpointing hangs and times out with some jobs
> 
>
> Key: FLINK-28032
> URL: https://issues.apache.org/jira/browse/FLINK-28032
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.3
> Environment: Here are the environment details
>  * Flink version 1.14.3
>  * Running Kubernetes
>  * Using RocksDB state backend.
>  * Checkpoint storage is S3 storage using the Presto library
>  * Exactly Once Semantics with unaligned checkpoints enabled.
>  * Checkpoint timeout 2 hours
>  * Maximum concurrent checkpoints is 1
>  * Taskmanager CPU: 4, Slots: 1, Process Size: 12 GB
>  * Using Kafka for input and output
>Reporter: Pauli Gandhi
>Priority: Major
> Attachments: checkpoint snapshot.png, jobgraph.png, 
> taskmanager_10.112.55.143_6122-969889_log, 
> taskmanager_10.112.55.143_6122-969889_thread_dump
>
>
> We have noticed that Flink jobs hangs and eventually times out after 2 hours 
> every time at the first checkpoint after it completes 15/23(65%) 
> acknowledgments.  There is no cpu/record processing activity but yet there 
> are a number of tasks reporting 100% back pressure.  It is peculiar to this 
> job and slight modifications to this job.  We have created many Flink jobs in 
> the past and never encountered the issue.  
> Here are the things we tried to narrow down the problem
>  * The job runs fine if checkpointing is disabled.
>  * Increasing the number of task managers and parallelism to 2 seems to help 
> the job complete.  However, it stalled again when we sent a larger data set.
>  * Increased taskmanager memory from 4 GB to 16 GB and cpu from 1 to 4 but 
> didn't help.
>  * Sometimes restarting the job manager helps but at other times not.
>  * Breaking up the job into smaller parts helps the job to finish.
>  * Analyzed the the thread dump and it appears all threads are either in 
> sleeping or wait state.
> I have attached the task manager logs (including debug logs for 
> checkpointing), thread dump, and screen shots of the job graph and stalled 
> checkpoint.
> Your help in resolving this issue is greatly appreciated.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28037) Flink SQL Upsert-Kafka can not support Flink1.14.x

2022-06-13 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553927#comment-17553927
 ] 

Martijn Visser commented on FLINK-28037:


[~renqs] Can you have a look at this one?

> Flink SQL Upsert-Kafka can not support Flink1.14.x
> --
>
> Key: FLINK-28037
> URL: https://issues.apache.org/jira/browse/FLINK-28037
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.14.2, 1.14.3, 1.14.4
> Environment: Flink Version: 1.14.0 1.14.2 1.14.3 1.14.4
>Reporter: Jiangfei Liu
>Priority: Major
>  Labels: flink-connector-kafka, upser-kafka
> Attachments: kafka-sql.png, kafka-sql2.png
>
>
> in Flink 1.14.x,flink sql upsert-kafka sink can not write data into kafka 
> topic with sink buffer flush config,eg 
> h5. sink.buffer-flush.max-rows
> h5. sink.buffer-flush.interval
> in Flink1.13.x,flink sql upsert-kafka sink can write data into kafka topic 
> with sink buffer lush config



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-28037) Flink SQL Upsert-Kafka can not support Flink1.14.x

2022-06-13 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-28037:
---
Affects Version/s: (was: 1.14.0)
   (was: 1.14.2)
   (was: 1.14.3)

> Flink SQL Upsert-Kafka can not support Flink1.14.x
> --
>
> Key: FLINK-28037
> URL: https://issues.apache.org/jira/browse/FLINK-28037
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4
> Environment: Flink Version: 1.14.0 1.14.2 1.14.3 1.14.4
>Reporter: Jiangfei Liu
>Priority: Major
>  Labels: flink-connector-kafka, upser-kafka
> Attachments: kafka-sql.png, kafka-sql2.png
>
>
> in Flink 1.14.x,flink sql upsert-kafka sink can not write data into kafka 
> topic with sink buffer flush config,eg 
> h5. sink.buffer-flush.max-rows
> h5. sink.buffer-flush.interval
> in Flink1.13.x,flink sql upsert-kafka sink can write data into kafka topic 
> with sink buffer lush config



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] reswqa commented on pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits

2022-06-13 Thread GitBox


reswqa commented on PR #19946:
URL: https://github.com/apache/flink/pull/19946#issuecomment-1154783029

   > Maybe add a hotfix commit "Rework BinaryInputFormatTest to be based on 
AssertJ" first. And then add the new test using AssertJ.
   
   same as @zhuzhurk's previous suggestion, you should put the hotfix in the 
first commit, and then commit your changes and new tests follow it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

2022-06-13 Thread GitBox


dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1154782595

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits

2022-06-13 Thread GitBox


reswqa commented on code in PR #19946:
URL: https://github.com/apache/flink/pull/19946#discussion_r896422901


##
flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java:
##
@@ -73,13 +76,21 @@ public void testCreateInputSplitsWithOneFile() throws 
IOException {
 
 FileInputSplit[] inputSplits = 
inputFormat.createInputSplits(numBlocks);
 
-Assert.assertEquals("Returns requested numbers of splits.", numBlocks, 
inputSplits.length);
-Assert.assertEquals(
-"1. split has block size length.", blockSize, 
inputSplits[0].getLength());
-Assert.assertEquals(
-"2. split has block size length.", blockSize, 
inputSplits[1].getLength());
-Assert.assertEquals(
-"3. split has block size length.", blockSize, 
inputSplits[2].getLength());
+assertThat(inputSplits.length)

Review Comment:
   ```suggestion
  assertThat(inputSplits)
   .as("Returns requested numbers of splits.")
   .hasSize(numBlocks);
   ```



##
flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java:
##
@@ -154,17 +170,60 @@ public void testGetStatisticsMultiplePaths() throws 
IOException {
 inputFormat.setBlockSize(blockSize);
 
 BaseStatistics stats = inputFormat.getStatistics(null);
-Assert.assertEquals(
-"The file size statistics is wrong",
-blockSize * (numBlocks1 + numBlocks2),
-stats.getTotalInputSize());
+
+assertThat(stats.getTotalInputSize())
+.as("The file size statistics is wrong")
+.isEqualTo(blockSize * (numBlocks1 + numBlocks2));
+}
+
+@Test
+public void testCreateInputSplitsWithEmptySplit() throws IOException {
+final int blockInfoSize = new BlockInfo().getInfoSize();
+final int blockSize = blockInfoSize + 8;
+final int numBlocks = 3;
+final int minNumSplits = 5;
+
+// create temporary file with 3 blocks
+final File tempFile =
+createBinaryInputFile(
+"test_create_input_splits_with_empty_split", 
blockSize, numBlocks);
+
+final Configuration config = new Configuration();
+config.setLong("input.block_size", blockSize + 10);
+
+final BinaryInputFormat inputFormat = new 
MyBinaryInputFormat();
+inputFormat.setFilePath(tempFile.toURI().toString());
+inputFormat.setBlockSize(blockSize);
+
+inputFormat.configure(config);
+
+FileInputSplit[] inputSplits = 
inputFormat.createInputSplits(minNumSplits);
+
+assertThat(inputSplits.length)
+.as("Returns requested numbers of splits.")
+.isEqualTo(minNumSplits);
+
+assertThat(inputSplits[0].getLength())
+.as("1. split has block size length.")
+.isEqualTo(blockSize);
+
+assertThat(inputSplits[1].getLength())
+.as("2. split has block size length.")
+.isEqualTo(blockSize);
+
+assertThat(inputSplits[2].getLength())
+.as("3. split has block size length.")
+.isEqualTo(blockSize);
+
+assertThat(inputSplits[3].getLength()).as("4. split has block size 
length.").isEqualTo(0);
+
+assertThat(inputSplits[4].getLength()).as("5. split has block size 
length.").isEqualTo(0);

Review Comment:
   ```suggestion
   assertThat(inputSplits[4].getLength()).as("5. split should be an 
empty split.").isZero();
   ```



##
flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java:
##
@@ -154,17 +170,60 @@ public void testGetStatisticsMultiplePaths() throws 
IOException {
 inputFormat.setBlockSize(blockSize);
 
 BaseStatistics stats = inputFormat.getStatistics(null);
-Assert.assertEquals(
-"The file size statistics is wrong",
-blockSize * (numBlocks1 + numBlocks2),
-stats.getTotalInputSize());
+
+assertThat(stats.getTotalInputSize())
+.as("The file size statistics is wrong")
+.isEqualTo(blockSize * (numBlocks1 + numBlocks2));
+}
+
+@Test
+public void testCreateInputSplitsWithEmptySplit() throws IOException {
+final int blockInfoSize = new BlockInfo().getInfoSize();
+final int blockSize = blockInfoSize + 8;
+final int numBlocks = 3;
+final int minNumSplits = 5;
+
+// create temporary file with 3 blocks
+final File tempFile =
+createBinaryInputFile(
+"test_create_input_splits_with_empty_split", 
blockSize, numBlocks);
+
+final Configuration config = new Configuration();
+config.setLong("input.block_size", blockSize + 10);
+
+final BinaryInputFormat inputFormat = new 
MyBinaryInputFormat();
+  

[GitHub] [flink] fsk119 commented on a diff in pull request #19823: [FLINK-27766][sql-gateway] Introduce the framework of the SqlGatewayService

2022-06-13 Thread GitBox


fsk119 commented on code in PR #19823:
URL: https://github.com/apache/flink/pull/19823#discussion_r896426974


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.context;
+
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.PlannerFactoryUtil;
+import org.apache.flink.table.gateway.common.endpoint.EndpointVersion;
+import org.apache.flink.table.gateway.common.session.SessionHandle;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Context describing a session, it's mainly used for user to open a new 
session in the backend. If
+ * client request to open a new session, the backend {@code Executor} will 
maintain the session
+ * context map util users close it.
+ */
+public class SessionContext {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SessionContext.class);
+
+private final SessionHandle sessionId;
+private final EndpointVersion endpointVersion;
+
+// store all options and use Configuration to build SessionState and 
TableConfig.
+private final Configuration sessionConf;
+private final SessionState sessionState;
+private final URLClassLoader userClassloader;
+
+private final OperationManager operationManager;
+
+private SessionContext(
+SessionHandle sessionId,
+EndpointVersion endpointVersion,
+Configuration sessionConf,
+URLClassLoader classLoader,
+SessionState sessionState,
+OperationManager operationManager) {
+this.sessionId = sessionId;
+this.endpointVersion = endpointVersion;
+this.sessionConf = sessionConf;
+this.userClassloader = classLoader;
+this.sessionState = sessionState;
+this.operationManager = operationManager;
+}
+
+// 

+// Getter method
+// 

+
+public SessionHandle getSessionId() {
+return this.sessionId;
+}
+
+public Map getConfigMap() {
+return sessionConf.toMap();
+}
+
+// 

+// Method to execute commands
+// 

+
+/** Close resources, e.g. catalogs. */
+public void close() {
+try (TemporaryClassLoaderContext ignored =
+TemporaryClassLoaderContext.of(userClassloader)) {

Review Comment:
   I think we 

[jira] [Resolved] (FLINK-28038) RocksDB rescaling improvement & rescaling benchmark - umbrella ticket

2022-06-13 Thread Yuan Mei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuan Mei resolved FLINK-28038.
--
Resolution: Fixed

> RocksDB rescaling improvement & rescaling benchmark - umbrella ticket
> -
>
> Key: FLINK-28038
> URL: https://issues.apache.org/jira/browse/FLINK-28038
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Minor
> Fix For: 1.16.0
>
>
> Placeholder umbrella ticket for RocksDB rescaling improvement.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-28038) RocksDB rescaling improvement & rescaling benchmark - umbrella ticket

2022-06-13 Thread Yuan Mei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuan Mei reassigned FLINK-28038:


Assignee: Yanfei Lei

> RocksDB rescaling improvement & rescaling benchmark - umbrella ticket
> -
>
> Key: FLINK-28038
> URL: https://issues.apache.org/jira/browse/FLINK-28038
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Minor
>
> Placeholder umbrella ticket for RocksDB rescaling improvement.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-28038) RocksDB rescaling improvement & rescaling benchmark - umbrella ticket

2022-06-13 Thread Yuan Mei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuan Mei updated FLINK-28038:
-
Fix Version/s: 1.16.0

> RocksDB rescaling improvement & rescaling benchmark - umbrella ticket
> -
>
> Key: FLINK-28038
> URL: https://issues.apache.org/jira/browse/FLINK-28038
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Minor
> Fix For: 1.16.0
>
>
> Placeholder umbrella ticket for RocksDB rescaling improvement.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] fsk119 commented on a diff in pull request #19823: [FLINK-27766][sql-gateway] Introduce the framework of the SqlGatewayService

2022-06-13 Thread GitBox


fsk119 commented on code in PR #19823:
URL: https://github.com/apache/flink/pull/19823#discussion_r896425180


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java:
##
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.common.session.SessionEnvironment;
+import org.apache.flink.table.gateway.common.session.SessionHandle;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.utils.Constants;
+import org.apache.flink.table.gateway.service.utils.ThreadUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL;
+import static 
org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
+import static 
org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM;
+import static 
org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME;
+import static 
org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX;
+import static 
org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN;
+
+/** Manage the lifecycle of the {@code Session}. */
+public class SessionManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SessionManager.class);
+
+private final DefaultContext defaultContext;
+
+private final long idleTimeout;
+private final long checkInterval;
+private final int maxSessionCount;
+
+private final Map sessions;
+
+private ExecutorService operationExecutorService;
+private ScheduledExecutorService scheduledExecutorService;
+private ScheduledFuture timeoutCheckerFuture;
+
+public SessionManager(DefaultContext defaultContext) {
+this.defaultContext = defaultContext;
+ReadableConfig conf = defaultContext.getFlinkConfig();
+this.idleTimeout = 
conf.get(SQL_GATEWAY_SESSION_IDLE_TIMEOUT).toMillis();
+this.checkInterval = 
conf.get(SQL_GATEWAY_SESSION_CHECK_INTERVAL).toMillis();
+this.maxSessionCount = conf.get(SQL_GATEWAY_SESSION_MAX_NUM);
+this.sessions = new ConcurrentHashMap<>();
+}
+
+public void start() {
+if (checkInterval > 0 && idleTimeout > 0) {
+scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+timeoutCheckerFuture =
+scheduledExecutorService.scheduleAtFixedRate(
+() -> {
+LOG.debug(
+"Start to cleanup expired sessions, 
current session count: {}",
+sessions.size());
+for (Map.Entry entry :
+sessions.entrySet()) {
+SessionHandle sessionId = entry.getKey();
+Session session = entry.getValue();
+if (isSessionExpired(session)) {
+LOG.info("Session {} is expired, close 
it...", sessionId);
+closeSession(session);
+ 

[jira] [Commented] (FLINK-28035) Don't check num of buckets for rescale bucket condition

2022-06-13 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553917#comment-17553917
 ] 

Jingsong Lee commented on FLINK-28035:
--

Can you add some description?

> Don't check num of buckets for rescale bucket condition
> ---
>
> Key: FLINK-28035
> URL: https://issues.apache.org/jira/browse/FLINK-28035
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.2.0
>Reporter: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-28035) Don't check num of buckets for rescale bucket condition

2022-06-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28035:
---
Labels: pull-request-available  (was: )

> Don't check num of buckets for rescale bucket condition
> ---
>
> Key: FLINK-28035
> URL: https://issues.apache.org/jira/browse/FLINK-28035
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.2.0
>Reporter: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-table-store] JingsongLi commented on pull request #157: [FLINK-28035] Don't check num of buckets for rescale bucket condition

2022-06-13 Thread GitBox


JingsongLi commented on PR #157:
URL: 
https://github.com/apache/flink-table-store/pull/157#issuecomment-1154769567

   Why just check for rescale? Throws exception for normal reading?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-28040) Introduce Trino reader for table store

2022-06-13 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-28040:


 Summary: Introduce Trino reader for table store
 Key: FLINK-28040
 URL: https://issues.apache.org/jira/browse/FLINK-28040
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Reporter: Jingsong Lee
 Fix For: table-store-0.2.0


Can refer to FLINK-27947 to write a Trino reader.

See https://trino.io/



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] zhoulii commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits

2022-06-13 Thread GitBox


zhoulii commented on code in PR #19946:
URL: https://github.com/apache/flink/pull/19946#discussion_r896417184


##
flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java:
##
@@ -160,11 +161,45 @@ public void testGetStatisticsMultiplePaths() throws 
IOException {
 stats.getTotalInputSize());
 }
 
+@Test
+public void testCreateInputSplitsWithEmptySplit() throws IOException {
+final int blockInfoSize = new BlockInfo().getInfoSize();
+final int blockSize = blockInfoSize + 8;
+final int numBlocks = 3;
+final int minNumSplits = 5;
+
+// create temporary file with 3 blocks
+final File tempFile =
+createBinaryInputFile(
+"test_create_input_splits_with_empty_split", 
blockSize, numBlocks);
+
+final Configuration config = new Configuration();
+config.setLong("input.block_size", blockSize + 10);
+
+final BinaryInputFormat inputFormat = new 
MyBinaryInputFormat();
+inputFormat.setFilePath(tempFile.toURI().toString());
+inputFormat.setBlockSize(blockSize);
+
+inputFormat.configure(config);
+
+FileInputSplit[] inputSplits = 
inputFormat.createInputSplits(minNumSplits);
+
+Assert.assertEquals(

Review Comment:
   Thanks for reviewing, the suggestion is very helpful. I have reworked 
BinaryInputFormatTest to be based on AssertJ, can you take a look when you are 
free?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27947) Introduce Spark Reader for table store

2022-06-13 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-27947:
-
Parent: FLINK-28039
Issue Type: Sub-task  (was: New Feature)

> Introduce Spark Reader for table store
> --
>
> Key: FLINK-27947
> URL: https://issues.apache.org/jira/browse/FLINK-27947
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.2.0
>
>
> Now that we have a more stable connector interface, we can develop a bit more 
> ecology.
> Apache Spark is a common batch computing engine, and the more common 
> scenarios are: Flink Streaming writes storage, Spark reads storage.
> So we can support Spark's reader.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-23399) Add a performance benchmark for statebackend rescaling

2022-06-13 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei updated FLINK-23399:
---
Parent: FLINK-28038
Issue Type: Sub-task  (was: Improvement)

> Add a performance benchmark for statebackend rescaling
> --
>
> Key: FLINK-23399
> URL: https://issues.apache.org/jira/browse/FLINK-23399
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Affects Versions: 1.14.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> We notice that rescaling is not covered in the current state benchmark, so 
> we'd like to introduce a benchmark to test performance of state backend 
> restore durign rescaling in flink-benchmark.
> The benchmark process is:
> (1) generate some states,
> (2) change the parallelism of the operator, and restore from these states 
> generate before.
> The implementation of this benchmark is based on 
> {{RocksIncrementalCheckpointRescalingTest}}, and *_AverageTime_* is used to 
> measure the rescaling performance on each subtask.
>  
> And this benchmark does not conflict with 
> `RocksIncrementalCheckpointRescalingBenchmarkTest` in 
>  PR([#14893|https://github.com/apache/flink/pull/14893]). Compare with 
> `RocksIncrementalCheckpointRescalingBenchmarkTest`, this benchmark supports 
> testing rescaling on different state backends, and has a finer granularity.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28039) Flink Table Store Ecosystem: Compute Engine Readers

2022-06-13 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-28039:


 Summary: Flink Table Store Ecosystem: Compute Engine Readers
 Key: FLINK-28039
 URL: https://issues.apache.org/jira/browse/FLINK-28039
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Reporter: Jingsong Lee
 Fix For: table-store-0.2.0


After some refactor, we have a stable connector interfaces, we can develop 
compute engine connectors in a controlled cost.

The most classic scenario is that write by Flink and read by other engines.

We can have Readers for Apache Hive, Apache Spark and Trino.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-26560) Make the threshold of the overlap fraction of incremental restoring configurable

2022-06-13 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei updated FLINK-26560:
---
Parent: FLINK-28038
Issue Type: Sub-task  (was: Improvement)

> Make the threshold of the overlap fraction of incremental restoring 
> configurable
> 
>
> Key: FLINK-26560
> URL: https://issues.apache.org/jira/browse/FLINK-26560
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Currently, the threshold of the overlap fraction of incremental restoring 
> `OVERLAP_FRACTION_THRESHOLD` is a hard-coded, fixed value.
>  
> {code:java}
> public class RocksDBIncrementalCheckpointUtils {
> /**
>  * The threshold of the overlap fraction of the handle's key-group range 
> with target key-group
>  * range to be an initial handle.
>  */
> private static final double OVERLAP_FRACTION_THRESHOLD = 0.75;
> ...
> } {code}
>  
> `OVERLAP_FRACTION_THRESHOLD` is used to control how to restore a state 
> handle, different thresholds can affect the performance of restoring. The 
> behavior of deletion in restoring has been changed after FLINK-21321, the old 
> threshold no longer fits the current situation.
> To make it easier to modify the threshold according to different situations, 
> changing `OVERLAP_FRACTION_THRESHOLD` to be configurable is suggested.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] dianfu commented on a diff in pull request #19896: [FLINK-27932][python] align noWatermarks and withWatermarkAlignment API

2022-06-13 Thread GitBox


dianfu commented on code in PR #19896:
URL: https://github.com/apache/flink/pull/19896#discussion_r896402211


##
docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md:
##
@@ -77,6 +77,19 @@ WatermarkStrategy
   })
 ```
 {{< /tab >}}
+{{< tab "Python" >}}

Review Comment:
   Could we also update the following page: 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/built_in/



##
docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md:
##
@@ -136,6 +149,26 @@ withTimestampsAndWatermarks
 .addSink(...)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# currently read_file is not supported in PyFlink

Review Comment:
   Could we create a ticket for this missing functionality?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-28038) RocksDB rescaling improvement & rescaling benchmark - umbrella ticket

2022-06-13 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-28038:
--

 Summary: RocksDB rescaling improvement & rescaling benchmark - 
umbrella ticket
 Key: FLINK-28038
 URL: https://issues.apache.org/jira/browse/FLINK-28038
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.16.0
Reporter: Yanfei Lei


Placeholder umbrella ticket for RocksDB rescaling improvement.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-26941) Support Pattern end with notFollowedBy with window

2022-06-13 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-26941:
---

Assignee: Yue Ma

> Support Pattern end with notFollowedBy with window
> --
>
> Key: FLINK-26941
> URL: https://issues.apache.org/jira/browse/FLINK-26941
> Project: Flink
>  Issue Type: Improvement
>  Components: Library / CEP
>Affects Versions: 1.14.4
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.4
>
>
> Currently a pattern sequence cannot end in notFollowedBy() in Flink CEP. But 
> in fact, this requirement exists in many scenarios. As mentioned in the 
> following tickets:
> https://issues.apache.org/jira/browse/FLINK-16010
> https://issues.apache.org/jira/browse/FLINK-9431
> Unfortunately, these tickets are not active for a long time.But we still 
> think this is an important feature for Flink CEP, so we would like to share 
> our implementation.
> If we want to find the users who created an order but didn't pay in 10 
> minutes. We could code like this:
> {code:java}
> Pattern.begin('create').notFollowedBy('pay_order').withIn(10min){code}
> If we receive the create event but don't receive the pay event within 10 
> minutes, then the match will be successful.
> The idea of implementation is basically the same as the design of FLINK-16010.
> A Pending State is introduced to represent the state of waiting for a 
> timeout, and there is a take edge between the Pending node and the Stop node.
> When advanceTime, if it is found that the pending node has timed out, then 
> extract the timeout sequence and output it normally as successed matched 
> sequence.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] reswqa commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

2022-06-13 Thread GitBox


reswqa commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r896405257


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -1201,6 +1243,33 @@ private void verifyFractions(
 delta);
 }
 
+@Test
+public void testSetNonDefaultSlotSharingInHybridMode() {
+Configuration configuration = new Configuration();
+// set all edge to HYBRID result partition type.
+configuration.set(
+ExecutionOptions.BATCH_SHUFFLE_MODE,
+BatchShuffleMode.EXPERIMENTAL_ALL_EXCHANGES_HYBRID);
+
+final StreamGraph streamGraph = 
createStreamGraphForSlotSharingTest(configuration);
+// specify slot sharing group for map1
+streamGraph.getStreamNodes().stream()
+.filter(n -> "map1".equals(n.getOperatorName()))
+.findFirst()
+.get()
+.setSlotSharingGroup("testSlotSharingGroup");
+
+try {
+StreamingJobGraphGenerator.createJobGraph(streamGraph);
+fail("hybrid shuffle mode with non default slot sharing group 
should failed.");
+} catch (IllegalStateException e) {
+assertTrue(
+e.getMessage()
+.contains(
+"hybrid shuffle mode currently does not 
support setting slot sharing group"));

Review Comment:
   > This should use assertj's assertThatThrownby()
   
   nice suggestion, i have fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType

2022-06-13 Thread GitBox


reswqa commented on code in PR #19927:
URL: https://github.com/apache/flink/pull/19927#discussion_r896405257


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -1201,6 +1243,33 @@ private void verifyFractions(
 delta);
 }
 
+@Test
+public void testSetNonDefaultSlotSharingInHybridMode() {
+Configuration configuration = new Configuration();
+// set all edge to HYBRID result partition type.
+configuration.set(
+ExecutionOptions.BATCH_SHUFFLE_MODE,
+BatchShuffleMode.EXPERIMENTAL_ALL_EXCHANGES_HYBRID);
+
+final StreamGraph streamGraph = 
createStreamGraphForSlotSharingTest(configuration);
+// specify slot sharing group for map1
+streamGraph.getStreamNodes().stream()
+.filter(n -> "map1".equals(n.getOperatorName()))
+.findFirst()
+.get()
+.setSlotSharingGroup("testSlotSharingGroup");
+
+try {
+StreamingJobGraphGenerator.createJobGraph(streamGraph);
+fail("hybrid shuffle mode with non default slot sharing group 
should failed.");
+} catch (IllegalStateException e) {
+assertTrue(
+e.getMessage()
+.contains(
+"hybrid shuffle mode currently does not 
support setting slot sharing group"));

Review Comment:
   > This should use assertj's assertThatThrownby()
   
   @zentol nice suggestion, i have fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits

2022-06-13 Thread GitBox


zhuzhurk commented on code in PR #19946:
URL: https://github.com/apache/flink/pull/19946#discussion_r896403096


##
flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java:
##
@@ -160,11 +161,45 @@ public void testGetStatisticsMultiplePaths() throws 
IOException {
 stats.getTotalInputSize());
 }
 
+@Test
+public void testCreateInputSplitsWithEmptySplit() throws IOException {
+final int blockInfoSize = new BlockInfo().getInfoSize();
+final int blockSize = blockInfoSize + 8;
+final int numBlocks = 3;
+final int minNumSplits = 5;
+
+// create temporary file with 3 blocks
+final File tempFile =
+createBinaryInputFile(
+"test_create_input_splits_with_empty_split", 
blockSize, numBlocks);
+
+final Configuration config = new Configuration();
+config.setLong("input.block_size", blockSize + 10);
+
+final BinaryInputFormat inputFormat = new 
MyBinaryInputFormat();
+inputFormat.setFilePath(tempFile.toURI().toString());
+inputFormat.setBlockSize(blockSize);
+
+inputFormat.configure(config);
+
+FileInputSplit[] inputSplits = 
inputFormat.createInputSplits(minNumSplits);
+
+Assert.assertEquals(

Review Comment:
   Maybe add a hotfix commit "Rework BinaryInputFormatTest to be based on 
AssertJ" first. And then add the new test using AssertJ. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits

2022-06-13 Thread GitBox


zhuzhurk commented on code in PR #19946:
URL: https://github.com/apache/flink/pull/19946#discussion_r896402127


##
flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java:
##
@@ -160,11 +161,45 @@ public void testGetStatisticsMultiplePaths() throws 
IOException {
 stats.getTotalInputSize());
 }
 
+@Test
+public void testCreateInputSplitsWithEmptySplit() throws IOException {
+final int blockInfoSize = new BlockInfo().getInfoSize();
+final int blockSize = blockInfoSize + 8;
+final int numBlocks = 3;
+final int minNumSplits = 5;
+
+// create temporary file with 3 blocks
+final File tempFile =
+createBinaryInputFile(
+"test_create_input_splits_with_empty_split", 
blockSize, numBlocks);
+
+final Configuration config = new Configuration();
+config.setLong("input.block_size", blockSize + 10);
+
+final BinaryInputFormat inputFormat = new 
MyBinaryInputFormat();
+inputFormat.setFilePath(tempFile.toURI().toString());
+inputFormat.setBlockSize(blockSize);
+
+inputFormat.configure(config);
+
+FileInputSplit[] inputSplits = 
inputFormat.createInputSplits(minNumSplits);
+
+Assert.assertEquals(

Review Comment:
   Makes sense. Ref: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] godfreyhe commented on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2022-06-13 Thread GitBox


godfreyhe commented on PR #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-1154742694

   @libenchao, Hope you can continue to move forward


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…

2022-06-13 Thread GitBox


Aitozi commented on PR #19840:
URL: https://github.com/apache/flink/pull/19840#issuecomment-1154742585

   Hi @KarmaGYZ , I have rebased this PR based on the previous commit, please 
take a look again, thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #19882: [hotfix] fix a typo in NetworkBufferTest

2022-06-13 Thread GitBox


reswqa commented on PR #19882:
URL: https://github.com/apache/flink/pull/19882#issuecomment-1154728455

   @wsry Just a very small hotfix, would you like to take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa closed pull request #14498: [hotfix][typo]Fix typo in StreamingJobGraphGenerator.areOperatorsChainable

2022-06-13 Thread GitBox


reswqa closed pull request #14498: [hotfix][typo]Fix typo in 
StreamingJobGraphGenerator.areOperatorsChainable
URL: https://github.com/apache/flink/pull/14498


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #14498: [hotfix][typo]Fix typo in StreamingJobGraphGenerator.areOperatorsChainable

2022-06-13 Thread GitBox


reswqa commented on PR #14498:
URL: https://github.com/apache/flink/pull/14498#issuecomment-1154727170

   fixed by FLINK-27903


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19950: [hotfix] [docs] Translate a Chinese comment into English

2022-06-13 Thread GitBox


flinkbot commented on PR #19950:
URL: https://github.com/apache/flink/pull/19950#issuecomment-1154726348

   
   ## CI report:
   
   * 19abcb256444ba684736e355bf76986a79d5f501 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] diohabara opened a new pull request, #19950: [hotfix] [docs] Translate a Chinese comment into English

2022-06-13 Thread GitBox


diohabara opened a new pull request, #19950:
URL: https://github.com/apache/flink/pull/19950

   
   
   I translate a Chinese comment in the sample code into English for more 
people to understand it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-28037) Flink SQL Upsert-Kafka can not support Flink1.14.x

2022-06-13 Thread Jiangfei Liu (Jira)
Jiangfei Liu created FLINK-28037:


 Summary: Flink SQL Upsert-Kafka can not support Flink1.14.x
 Key: FLINK-28037
 URL: https://issues.apache.org/jira/browse/FLINK-28037
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.4, 1.14.3, 1.14.2, 1.14.0
 Environment: Flink Version: 1.14.0 1.14.2 1.14.3 1.14.4
Reporter: Jiangfei Liu
 Attachments: kafka-sql.png, kafka-sql2.png

in Flink 1.14.x,flink sql upsert-kafka sink can not write data into kafka topic 
with sink buffer flush config,eg 
h5. sink.buffer-flush.max-rows
h5. sink.buffer-flush.interval

in Flink1.13.x,flink sql upsert-kafka sink can write data into kafka topic with 
sink buffer lush config



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-15635) Allow passing a ClassLoader to EnvironmentSettings

2022-06-13 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-15635:

Parent: FLINK-14055
Issue Type: Sub-task  (was: Improvement)

> Allow passing a ClassLoader to EnvironmentSettings
> --
>
> Key: FLINK-15635
> URL: https://issues.apache.org/jira/browse/FLINK-15635
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> We had a couple of class loading issues in the past because people forgot to 
> use the right classloader in {{flink-table}}. The SQL Client executor code 
> hacks a classloader into the planner process by using {{wrapClassLoader}} 
> that sets the threads context classloader.
> Instead we should allow passing a class loader to environment settings. This 
> class loader can be passed to the planner and can be stored in table 
> environment, table config, etc. to have a consistent class loading behavior.
> Having this in place should replace the need for 
> {{Thread.currentThread().getContextClassLoader()}} in the entire 
> {{flink-table}} module.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27828) FlinkKafkaProducer VS KafkaSink

2022-06-13 Thread Jiangfei Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553892#comment-17553892
 ] 

Jiangfei Liu commented on FLINK-27828:
--

checkpoint config:

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        env.enableCheckpointing(CHECKPOINT_INTERVAL);
        checkpointConfig.setCheckpointingMode(CHECKPOINT_MODE);
        checkpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT);
        
checkpointConfig.setTolerableCheckpointFailureNumber(CHECKPOINT_FAILURE_NUMBER);
        env.setRestartStrategy(RESTART_STRATEGY_CONFIGURATION);
        checkpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT);
        
checkpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_BETWEEN);
        
checkpointConfig.setExternalizedCheckpointCleanup(CHECKPOINT_EXTERNALIZED_CLEANUP);
        checkpointConfig.setCheckpointStorage(new 
FileSystemCheckpointStorage(HDFS_BASE + CHECKPOINT_BASE_PATH + path));
        System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);

parallelism config: 3

 

 

> FlinkKafkaProducer VS KafkaSink
> ---
>
> Key: FLINK-27828
> URL: https://issues.apache.org/jira/browse/FLINK-27828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.14.3
>Reporter: Jiangfei Liu
>Priority: Major
> Attachments: Snipaste_2022-05-25_19-52-11.png
>
>
> sorry,my english is bad.
> in flink1.14.3,write 1 data to kafka.
> when use FlinkKafkaProducer,completed 7s
> when use KafkaSink,completed 1m40s
> why KafkaSink is low speed?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27828) FlinkKafkaProducer VS KafkaSink

2022-06-13 Thread Jiangfei Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553891#comment-17553891
 ] 

Jiangfei Liu commented on FLINK-27828:
--

parallelism:3

checkpoint config:

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        env.enableCheckpointing(CHECKPOINT_INTERVAL);
        checkpointConfig.setCheckpointingMode(CHECKPOINT_MODE);
        checkpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT);
        
checkpointConfig.setTolerableCheckpointFailureNumber(CHECKPOINT_FAILURE_NUMBER);
        env.setRestartStrategy(RESTART_STRATEGY_CONFIGURATION);
        checkpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT);
        
checkpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_BETWEEN);
        
checkpointConfig.setExternalizedCheckpointCleanup(CHECKPOINT_EXTERNALIZED_CLEANUP);
        checkpointConfig.setCheckpointStorage(new 
FileSystemCheckpointStorage(HDFS_BASE + CHECKPOINT_BASE_PATH + path));
        System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);

> FlinkKafkaProducer VS KafkaSink
> ---
>
> Key: FLINK-27828
> URL: https://issues.apache.org/jira/browse/FLINK-27828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.14.3
>Reporter: Jiangfei Liu
>Priority: Major
> Attachments: Snipaste_2022-05-25_19-52-11.png
>
>
> sorry,my english is bad.
> in flink1.14.3,write 1 data to kafka.
> when use FlinkKafkaProducer,completed 7s
> when use KafkaSink,completed 1m40s
> why KafkaSink is low speed?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] (FLINK-27828) FlinkKafkaProducer VS KafkaSink

2022-06-13 Thread Jiangfei Liu (Jira)


[ https://issues.apache.org/jira/browse/FLINK-27828 ]


Jiangfei Liu deleted comment on FLINK-27828:
--

was (Author: JIRAUSER290004):
parallelism:3

checkpoint config:

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        env.enableCheckpointing(CHECKPOINT_INTERVAL);
        checkpointConfig.setCheckpointingMode(CHECKPOINT_MODE);
        checkpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT);
        
checkpointConfig.setTolerableCheckpointFailureNumber(CHECKPOINT_FAILURE_NUMBER);
        env.setRestartStrategy(RESTART_STRATEGY_CONFIGURATION);
        checkpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT);
        
checkpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_BETWEEN);
        
checkpointConfig.setExternalizedCheckpointCleanup(CHECKPOINT_EXTERNALIZED_CLEANUP);
        checkpointConfig.setCheckpointStorage(new 
FileSystemCheckpointStorage(HDFS_BASE + CHECKPOINT_BASE_PATH + path));
        System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);

> FlinkKafkaProducer VS KafkaSink
> ---
>
> Key: FLINK-27828
> URL: https://issues.apache.org/jira/browse/FLINK-27828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.14.3
>Reporter: Jiangfei Liu
>Priority: Major
> Attachments: Snipaste_2022-05-25_19-52-11.png
>
>
> sorry,my english is bad.
> in flink1.14.3,write 1 data to kafka.
> when use FlinkKafkaProducer,completed 7s
> when use KafkaSink,completed 1m40s
> why KafkaSink is low speed?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26721) PulsarSourceITCase.testSavepoint failed on azure pipeline

2022-06-13 Thread godfrey he (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553889#comment-17553889
 ] 

godfrey he commented on FLINK-26721:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36637&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203

> PulsarSourceITCase.testSavepoint failed on azure pipeline
> -
>
> Key: FLINK-26721
> URL: https://issues.apache.org/jira/browse/FLINK-26721
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Priority: Critical
>  Labels: build-stability
>
> {code:java}
> Mar 18 05:49:52 [ERROR] Tests run: 12, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 315.581 s <<< FAILURE! - in 
> org.apache.flink.connector.pulsar.source.PulsarSourceITCase
> Mar 18 05:49:52 [ERROR] 
> org.apache.flink.connector.pulsar.source.PulsarSourceITCase.testSavepoint(TestEnvironment,
>  DataStreamSourceExternalContext, CheckpointingMode)[1]  Time elapsed: 
> 140.803 s  <<< FAILURE!
> Mar 18 05:49:52 java.lang.AssertionError: 
> Mar 18 05:49:52 
> Mar 18 05:49:52 Expecting
> Mar 18 05:49:52   
> Mar 18 05:49:52 to be completed within 2M.
> Mar 18 05:49:52 
> Mar 18 05:49:52 exception caught while trying to get the future result: 
> java.util.concurrent.TimeoutException
> Mar 18 05:49:52   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
> Mar 18 05:49:52   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> Mar 18 05:49:52   at 
> org.assertj.core.internal.Futures.assertSucceededWithin(Futures.java:109)
> Mar 18 05:49:52   at 
> org.assertj.core.api.AbstractCompletableFutureAssert.internalSucceedsWithin(AbstractCompletableFutureAssert.java:400)
> Mar 18 05:49:52   at 
> org.assertj.core.api.AbstractCompletableFutureAssert.succeedsWithin(AbstractCompletableFutureAssert.java:396)
> Mar 18 05:49:52   at 
> org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.checkResultWithSemantic(SourceTestSuiteBase.java:766)
> Mar 18 05:49:52   at 
> org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.restartFromSavepoint(SourceTestSuiteBase.java:399)
> Mar 18 05:49:52   at 
> org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.testSavepoint(SourceTestSuiteBase.java:241)
> Mar 18 05:49:52   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 18 05:49:52   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 18 05:49:52   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 18 05:49:52   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 18 05:49:52   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
> Mar 18 05:49:52   at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$in

[GitHub] [flink] flinkbot commented on pull request #19949: [FLINK-28036][hive] HiveInspectors should use correct writable type to create ConstantObjectInspector

2022-06-13 Thread GitBox


flinkbot commented on PR #19949:
URL: https://github.com/apache/flink/pull/19949#issuecomment-1154694926

   
   ## CI report:
   
   * b2f5199f5d430f4684e63f5f4d4317690c7f53e2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits

2022-06-13 Thread GitBox


reswqa commented on code in PR #19946:
URL: https://github.com/apache/flink/pull/19946#discussion_r896360739


##
flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java:
##
@@ -160,11 +161,45 @@ public void testGetStatisticsMultiplePaths() throws 
IOException {
 stats.getTotalInputSize());
 }
 
+@Test
+public void testCreateInputSplitsWithEmptySplit() throws IOException {
+final int blockInfoSize = new BlockInfo().getInfoSize();
+final int blockSize = blockInfoSize + 8;
+final int numBlocks = 3;
+final int minNumSplits = 5;
+
+// create temporary file with 3 blocks
+final File tempFile =
+createBinaryInputFile(
+"test_create_input_splits_with_empty_split", 
blockSize, numBlocks);
+
+final Configuration config = new Configuration();
+config.setLong("input.block_size", blockSize + 10);
+
+final BinaryInputFormat inputFormat = new 
MyBinaryInputFormat();
+inputFormat.setFilePath(tempFile.toURI().toString());
+inputFormat.setBlockSize(blockSize);
+
+inputFormat.configure(config);
+
+FileInputSplit[] inputSplits = 
inputFormat.createInputSplits(minNumSplits);
+
+Assert.assertEquals(

Review Comment:
   Are you willing to use assertj instead of junit.Assert,so that subsequent 
migration to junit5 will be more convenient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia opened a new pull request, #19949: [FLINK-28036][hive] HiveInspectors should use correct writable type to create ConstantObjectInspector

2022-06-13 Thread GitBox


luoyuxia opened a new pull request, #19949:
URL: https://github.com/apache/flink/pull/19949

   
   
   ## What is the purpose of the change
   To make `HiveInspectors` use correct writable type to create 
ConstantObjectInspector
   
   
   ## Brief change log
   Replace `org.apache.hadoop.io.ByteWritable`, 
`org.apache.hadoop.io.ShortWritable`, `org.apache.hadoop.io.DoubleWritable`  
with `org.apache.hadoop.hive.serde2.io.ByteWritable`, 
`org.apache.hadoop.hive.serde2.io.ShortWritable`, 
`org.apache.hadoop.hive.serde2.io.DoubleWritable` when create 
`WritableConstantByteObjectInspector`, `WritableConstantShortObjectInspector`, 
`WritableConstantDoubleObjectInspector`.
   
   
   ## Verifying this change
   Existing test and verification manually. 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
 - The serializers:  no
 - The runtime per-record code paths (performance sensitive):  no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper:  no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  no
 - If yes, how is the feature documented?  N/A
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28036) HiveInspectors should use correct writable type to creaet ConstantObjectInspector

2022-06-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28036:
---
Labels: pull-request-available  (was: )

> HiveInspectors should use correct writable type to creaet 
> ConstantObjectInspector
> -
>
> Key: FLINK-28036
> URL: https://issues.apache.org/jira/browse/FLINK-28036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> In HiveInspectors, we will create Hive's ConstantObjectInspector by passing 
> writable type such as ByteWritable, DoubleWritable, all of the writable class 
> are classes of  package org.apache.hadoop.io. But the Hive's 
> ConstantObjectInspector may require different class such as 
> WritableConstantDoubleObjectInspector require class 
> org.apache.hadoop.hive.serde2.io.DoubleWritable, which is class of package 
> org.apache.hadoop.hive.serde2.io.DoubleWritable. Then when touch such code, 
> it will throw "no such method exception:  
> org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectinspector.(org.apache.hadoop.io.DoubleWritable).
>  
> I found  ByteWritable, ShortWritable, DoubleWritable should be the class in 
> package of `class org.apache.hadoop.hive.serde2.io` instead of 
> `org.apache.hadoop.io`.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] snuyanzin commented on pull request #18779: [hotfix][javadoc] Misprint in javadoc at QuadFunction.java

2022-06-13 Thread GitBox


snuyanzin commented on PR #18779:
URL: https://github.com/apache/flink/pull/18779#issuecomment-1154691101

   @MartijnVisser sorry for the poke.
   Could you please have a look once you have time?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-27658) FlinkUserCodeClassLoader expose addURL method to allow to register jar dynamically

2022-06-13 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-27658.
---
  Assignee: dalongliu
Resolution: Fixed

Done via:

2e37a06368596ca9ed04366a879f2facfb7ef509

> FlinkUserCodeClassLoader expose addURL method to allow to register jar 
> dynamically
> --
>
> Key: FLINK-27658
> URL: https://issues.apache.org/jira/browse/FLINK-27658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task, Table SQL / Planner
>Affects Versions: 1.16.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] zhuzhurk merged pull request #19860: [FLINK-27658][table] FlinkUserCodeClassLoader expose addURL method to allow to register jar dynamically

2022-06-13 Thread GitBox


zhuzhurk merged PR #19860:
URL: https://github.com/apache/flink/pull/19860


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #19849: [FLINK-27767][sql-gateway] Introduce Endpoint API and utils

2022-06-13 Thread GitBox


reswqa commented on code in PR #19849:
URL: https://github.com/apache/flink/pull/19849#discussion_r896357754


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerTest.java:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions;
+import org.apache.flink.table.gateway.common.session.SessionEnvironment;
+import org.apache.flink.table.gateway.common.session.SessionHandle;
+import org.apache.flink.table.gateway.common.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Test for {@link SessionManager}. */
+public class SessionManagerTest {

Review Comment:
   A small suggestion: I think we should now use Junit5 and assertj in newly 
introduced tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #19939: [FLINK-27983][FLINK-27984][FLINK-27985] Introduce SupportsStatisticsReport, FileBasedStatisticsReportableDecodingFormat, FlinkRecomp

2022-06-13 Thread GitBox


reswqa commented on code in PR #19939:
URL: https://github.com/apache/flink/pull/19939#discussion_r896353277


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java:
##
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.TableScan;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for statistics functionality in {@link FileSystemTableSource}. */
+public class FileSystemStatisticsReportTest extends TableTestBase {

Review Comment:
   I think we should now use Junit5 and assertj in newly introduced tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #19939: [FLINK-27983][FLINK-27984][FLINK-27985] Introduce SupportsStatisticsReport, FileBasedStatisticsReportableDecodingFormat, FlinkRecomp

2022-06-13 Thread GitBox


reswqa commented on code in PR #19939:
URL: https://github.com/apache/flink/pull/19939#discussion_r896352923


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java:
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsStatisticReport;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import 
org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED;
+
+/**
+ * A FlinkOptimizeProgram that recompute statistics after partition pruning 
and filter push down.
+ *
+ * It's a very heavy operation to get statistics from catalogs or 
connectors, so this centralized
+ * way can avoid getting statistics again and again.
+ */
+public class FlinkRecomputeStatisticsProgram implements 
FlinkOptimizeProgram {

Review Comment:
   I think we should now use Junit5 and assertj in newly introduced tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhoulii commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits

2022-06-13 Thread GitBox


zhoulii commented on code in PR #19946:
URL: https://github.com/apache/flink/pull/19946#discussion_r896348640


##
flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java:
##
@@ -160,6 +160,44 @@ public void testGetStatisticsMultiplePaths() throws 
IOException {
 stats.getTotalInputSize());
 }
 
+@Test
+public void testCreateInputSplitsWithEmptySplit() throws IOException {
+// create temporary file with 3 blocks
+final File tempFile = File.createTempFile("binary_input_format_test", 
"tmp");
+tempFile.deleteOnExit();

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28034) ClassCastException occurred in creating a checkpoint with merge windows

2022-06-13 Thread Takayuki Eimizu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553883#comment-17553883
 ] 

Takayuki Eimizu commented on FLINK-28034:
-

I have already implemented a fix for this issue and will create a pull request 
shortly.

> ClassCastException occurred in creating a checkpoint with merge windows 
> 
>
> Key: FLINK-28034
> URL: https://issues.apache.org/jira/browse/FLINK-28034
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.15.0
>Reporter: Takayuki Eimizu
>Priority: Major
>
> h1. Summary
> In Flink 1.15.0, the combination of following functions always occur 
> ClassCastException.
>  - Session Window
>  - Checkpoint
>  - Keyed State
> The following repository provides minimal source code that can combine these 
> features to reproduce the exception.
> [https://github.com/t-eimizu/flink-checkpoint-with-merging-window]
>  
> h1. Description
> h2. How the Exception Occurred
>  
> In the process window function of the session window, we must use 
> `context.globalState()`
> instead of `context.windowState()`. If you use `context.windowState()` in 
> this situation, Flink throws `UnsupportedOperationException`.
>  
> So we have to do following:
>  
> {code:java}
>stPreviousValue = context.globalState().getState(desc4PreviousValue); 
> {code}
>  
> Then stPreviousValue will have the following fields:
> ||Field Name||Value||
> |currentNamespace|VoidNamespace|
> |namespaceSerializer|TimeWindow$serializer|
> As a result, when flink create checkpoint on this job, ClassCastException 
> occurs.
> {code:java}
> 2022-06-14 11:04:57,212 INFO  
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - 
> ProcessingData -> Sink: PrintData (1/1)#0 - asynchronous part of checkpoint 1 
> could not be completed. java.util.concurrent.ExecutionException: 
> java.lang.ClassCastException: class 
> org.apache.flink.runtime.state.VoidNamespace cannot be cast to class 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow 
> (org.apache.flink.runtime.state.VoidNamespace and 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow are in unnamed 
> module of loader 'app') at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?] at 
> java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?] at 
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645)
>  ~[flink-core-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
>  [flink-streaming-java-1.15.0.jar:1.15.0] at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?] at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?] at java.lang.Thread.run(Thread.java:834) [?:?] Caused by: 
> java.lang.ClassCastException: class 
> org.apache.flink.runtime.state.VoidNamespace cannot be cast to class 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow 
> (org.apache.flink.runtime.state.VoidNamespace and 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow are in unnamed 
> module of loader 'app') at 
> org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer.serialize(TimeWindow.java:130)
>  ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:145)
>  ~[flink-runtime-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116)
>  ~[flink-runtime-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38)
>  ~[flink-runtime-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172)
>  ~[flink-runtime-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
>  ~[flink-runtime-1.15.0.jar:1.15.0] at 
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
>  ~[flink-runtime-1.15.0.jar:1.15.0] at 
> org.apache.flink.runti

[GitHub] [flink] PatrickRen commented on pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

2022-06-13 Thread GitBox


PatrickRen commented on PR #19456:
URL: https://github.com/apache/flink/pull/19456#issuecomment-1154675519

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28036) HiveInspectors should use correct writable type to creaet ConstantObjectInspector

2022-06-13 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia updated FLINK-28036:
-
Description: 
In HiveInspectors, we will create Hive's ConstantObjectInspector by passing 
writable type such as ByteWritable, DoubleWritable, all of the writable class 
are classes of  package org.apache.hadoop.io. But the Hive's 
ConstantObjectInspector may require different class such as 

WritableConstantDoubleObjectInspector require class 
org.apache.hadoop.hive.serde2.io.DoubleWritable, which is class of package 

org.apache.hadoop.hive.serde2.io.DoubleWritable. Then when touch such code, it 
will throw "no such method exception:  
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectinspector.(org.apache.hadoop.io.DoubleWritable).

 

I found  ByteWritable, ShortWritable, DoubleWritable should be the class in 
package of `class org.apache.hadoop.hive.serde2.io` instead of 
`org.apache.hadoop.io`.

 

  was:In 


> HiveInspectors should use correct writable type to creaet 
> ConstantObjectInspector
> -
>
> Key: FLINK-28036
> URL: https://issues.apache.org/jira/browse/FLINK-28036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Priority: Major
> Fix For: 1.16.0
>
>
> In HiveInspectors, we will create Hive's ConstantObjectInspector by passing 
> writable type such as ByteWritable, DoubleWritable, all of the writable class 
> are classes of  package org.apache.hadoop.io. But the Hive's 
> ConstantObjectInspector may require different class such as 
> WritableConstantDoubleObjectInspector require class 
> org.apache.hadoop.hive.serde2.io.DoubleWritable, which is class of package 
> org.apache.hadoop.hive.serde2.io.DoubleWritable. Then when touch such code, 
> it will throw "no such method exception:  
> org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectinspector.(org.apache.hadoop.io.DoubleWritable).
>  
> I found  ByteWritable, ShortWritable, DoubleWritable should be the class in 
> package of `class org.apache.hadoop.hive.serde2.io` instead of 
> `org.apache.hadoop.io`.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] dianfu commented on a diff in pull request #19295: [FLINK-26941][cep] Support Pattern end with notFollowedBy with window

2022-06-13 Thread GitBox


dianfu commented on code in PR #19295:
URL: https://github.com/apache/flink/pull/19295#discussion_r895598233


##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java:
##
@@ -261,15 +261,31 @@ public Collection>, Long>> 
advanceTime(
 final NFAState nfaState,
 final long timestamp)
 throws Exception {
+return advanceTimeAndHandlePendingState(sharedBufferAccessor, 
nfaState, timestamp).f1;
+}
+
+public Tuple2>>, 
Collection>, Long>>>
+advanceTimeAndHandlePendingState(

Review Comment:
   There is no need to introduce advanceTimeAndHandlePendingState. We could 
just update the signature of advanceTime if needed.



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##
@@ -304,6 +305,18 @@ private State createMiddleStates(final State 
sinkState) {
 if (currentPattern.getQuantifier().getConsumingStrategy()
 == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
 // skip notFollow patterns, they are converted into edge 
conditions
+if (currentPattern.getWindowTime() != null

Review Comment:
   should use `windowTime` instead of `currentPattern.getWindowTime()`. This 
also means that we need to calculate `windowTime` in advance. 



##
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java:
##
@@ -630,6 +630,102 @@ public String select(Map> pattern) {
 assertEquals(expected, resultList);
 }
 
+@Test
+public void testNotFollowedByWithIn() throws Exception {

Review Comment:
   Could we support the following pattern? If so, it would be great to add an 
ITCase.
   ```
   Pattern.begin('A').notFollowedBy('B').followedBy('C').times(0, 
2).withIn(Time.milliseconds(3))
   ```



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##
@@ -158,9 +158,10 @@ public static boolean canProduceEmptyMatches(final 
Pattern pattern) {
  */
 void compileFactory() {
 if (currentPattern.getQuantifier().getConsumingStrategy()
-== Quantifier.ConsumingStrategy.NOT_FOLLOW) {
+== Quantifier.ConsumingStrategy.NOT_FOLLOW

Review Comment:
   Currently, the windowTime is the minimum of all window times and so it may 
happen that the window time is defined in the other Pattern. What about moving 
this check to the end of this method?



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##
@@ -304,6 +305,18 @@ private State createMiddleStates(final State 
sinkState) {
 if (currentPattern.getQuantifier().getConsumingStrategy()
 == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
 // skip notFollow patterns, they are converted into edge 
conditions
+if (currentPattern.getWindowTime() != null
+&& currentPattern.getWindowTime().toMilliseconds() 
> 0
+&& sinkState.isFinal()) {
+final State notFollow =
+createState(currentPattern.getName(), 
State.StateType.Pending);
+final IterativeCondition notCondition = 
getTakeCondition(currentPattern);
+final State stopState =
+createStopState(notCondition, 
currentPattern.getName());
+notFollow.addTake(stopState, notCondition);

Review Comment:
   ```suggestion
   notFollow.addProceed(stopState, notCondition);
   ```



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##
@@ -304,6 +305,18 @@ private State createMiddleStates(final State 
sinkState) {
 if (currentPattern.getQuantifier().getConsumingStrategy()
 == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
 // skip notFollow patterns, they are converted into edge 
conditions
+if (currentPattern.getWindowTime() != null
+&& currentPattern.getWindowTime().toMilliseconds() 
> 0
+&& sinkState.isFinal()) {

Review Comment:
   ```suggestion
   && lastSink.isFinal()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28036) HiveInspectors should use correct writable type to creaet ConstantObjectInspector

2022-06-13 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia updated FLINK-28036:
-
Summary: HiveInspectors should use correct writable type to creaet 
ConstantObjectInspector  (was: Use correct writable type to creaet 
ConstantObjectInspector)

> HiveInspectors should use correct writable type to creaet 
> ConstantObjectInspector
> -
>
> Key: FLINK-28036
> URL: https://issues.apache.org/jira/browse/FLINK-28036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Priority: Major
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28036) Use correct writable type to creaet ConstantObjectInspector

2022-06-13 Thread luoyuxia (Jira)
luoyuxia created FLINK-28036:


 Summary: Use correct writable type to creaet 
ConstantObjectInspector
 Key: FLINK-28036
 URL: https://issues.apache.org/jira/browse/FLINK-28036
 Project: Flink
  Issue Type: Sub-task
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-28036) Use correct writable type to creaet ConstantObjectInspector

2022-06-13 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia updated FLINK-28036:
-
Fix Version/s: 1.16.0

> Use correct writable type to creaet ConstantObjectInspector
> ---
>
> Key: FLINK-28036
> URL: https://issues.apache.org/jira/browse/FLINK-28036
> Project: Flink
>  Issue Type: Sub-task
>Reporter: luoyuxia
>Priority: Major
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-28036) Use correct writable type to creaet ConstantObjectInspector

2022-06-13 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia updated FLINK-28036:
-
Component/s: Connectors / Hive

> Use correct writable type to creaet ConstantObjectInspector
> ---
>
> Key: FLINK-28036
> URL: https://issues.apache.org/jira/browse/FLINK-28036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Priority: Major
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-28036) HiveInspectors should use correct writable type to creaet ConstantObjectInspector

2022-06-13 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia updated FLINK-28036:
-
Description: In 

> HiveInspectors should use correct writable type to creaet 
> ConstantObjectInspector
> -
>
> Key: FLINK-28036
> URL: https://issues.apache.org/jira/browse/FLINK-28036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: luoyuxia
>Priority: Major
> Fix For: 1.16.0
>
>
> In 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27395) IllegalStateException: Could not find policy 'pick_first'. on Flink Application

2022-06-13 Thread Natan HP (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Natan HP updated FLINK-27395:
-
Description: 
I got this exception on flink taskmanager, but I can see that the data is 
successfully published in the pub sub. Here is the log:

 
{noformat}
2022-04-25 07:53:44,293 INFO  org.apache.flink.runtime.taskmanager.Task         
           
[] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched 
from INITIALIZING to RUNNING.
Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 uncaughtException
SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the 
SynchronizationContext. Panic!
java.lang.IllegalStateException: Could not find policy 'pick_first'. Make sure 
its implementation is either registered to LoadBalancerRegistry or included in 
META-INF/services/io.grpc.LoadBalancerProvider from your jar files.
    at 
io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.(AutoConfiguredLoadBalancerFactory.java:94)
    at 
io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65)
    at 
io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375)
    at 
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469)
    at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
    at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
    at 
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473)
    at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253)
    at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210)
    at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32)
    at 
com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94)
    at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314)
    at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288)
    at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200)
    at 
com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
    at 
com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65)
    at 
com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
    at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86)
    at 
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
    at 
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
    at 
com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82)
    at 
com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
    at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
    at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
    at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425)
    at 
com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471)
    at 
com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399)
    at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88)
    at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
{noformat}
 

 

The code sample:
{code:java}
SinkFunction pubsubSink = PubSubSink.newBuilder() 
.withSerializationSchema((SerializationSchema) s -> 
s.getBytes(StandardCharsets.UTF_8)) 
.withProjectName("") 
.withTopicName("")
.build(); 

dataStream.addSink(pubsubSink) 
.name("Pub-sub-sink");  {code}
 

I use Maven Assembly Plugin to create the uber JAR:
{noformat}

org.apache.maven.plugins
maven-assembly-plugin
2.6



org.example.flink.Main



jar-with-dependencies




make-assembly
package

single



{noformat}
 

The content of the JAR:
{noformat}
➜ jar tf MyApp.jar | grep io.grpc.LoadBalancerProvider       
io/grpc/LoadBalancerProvider$UnknownConfig.class
META-INF/services/io.grpc.Lo

[jira] [Updated] (FLINK-27395) IllegalStateException: Could not find policy 'pick_first'. on Flink Application

2022-06-13 Thread Natan HP (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Natan HP updated FLINK-27395:
-
Description: 
I got this exception on flink taskmanager, but I can see that the data is 
successfully published in the pub sub. Here is the log:

 
{noformat}
2022-04-25 07:53:44,293 INFO  org.apache.flink.runtime.taskmanager.Task         
           
[] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched 
from INITIALIZING to RUNNING.
Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 uncaughtException
SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the 
SynchronizationContext. Panic!
java.lang.IllegalStateException: Could not find policy 'pick_first'. Make sure 
its implementation is either registered to LoadBalancerRegistry or included in 
META-INF/services/io.grpc.LoadBalancerProvider from your jar files.
    at 
io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.(AutoConfiguredLoadBalancerFactory.java:94)
    at 
io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65)
    at 
io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375)
    at 
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469)
    at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
    at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
    at 
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473)
    at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253)
    at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210)
    at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32)
    at 
com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94)
    at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314)
    at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288)
    at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200)
    at 
com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
    at 
com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65)
    at 
com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
    at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86)
    at 
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
    at 
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
    at 
com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82)
    at 
com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
    at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
    at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
    at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425)
    at 
com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471)
    at 
com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399)
    at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88)
    at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
{noformat}
 

 

The code sample:
{code:java}
SinkFunction pubsubSink = PubSubSink.newBuilder() 
.withSerializationSchema((SerializationSchema) s -> 
s.getBytes(StandardCharsets.UTF_8)) 
.withProjectName("") 
.withTopicName("")
.build(); 

dataStream.addSink(pubsubSink) 
.name("Pub-sub-sink");  {code}
 

I use Maven Assembly Plugin to create the uber JAR:
{noformat}

org.apache.maven.plugins
maven-assembly-plugin
2.6



org.example.flink.Main



jar-with-dependencies




make-assembly
package

single



{noformat}
 

The content of the JAR:
{noformat}
➜ jar tf MyApp.jar | grep io.grpc.LoadBalancerProvider       
io/grpc/LoadBalancerProvider$UnknownConfig.class
META-INF/services/io.grpc.Lo

[jira] [Closed] (FLINK-27395) IllegalStateException: Could not find policy 'pick_first'. on Flink Application

2022-06-13 Thread Natan HP (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Natan HP closed FLINK-27395.


> IllegalStateException: Could not find policy 'pick_first'. on Flink 
> Application
> ---
>
> Key: FLINK-27395
> URL: https://issues.apache.org/jira/browse/FLINK-27395
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Google Cloud PubSub
>Affects Versions: 1.15.0, 1.14.2, 1.14.4
> Environment: # Minikube
> {noformat}
> ➜ minikube version
> minikube version: v1.25.2
> commit: 362d5fdc0a3dbee389b3d3f1034e8023e72bd3a7
> {noformat}
>  # Apache Flink Docker Images
> {noformat}
> apache/flink:1.14.4-scala_2.11{noformat}
> {noformat}
> apache/flink:1.15.0-scala_2.12-java11{noformat}
>Reporter: Natan HP
>Priority: Major
>
> I got this exception on flink taskmanager, but I can see that the data is 
> successfully published in the pub sub. Here is the log:
>  
> {noformat}
> 2022-04-25 07:53:44,293 INFO  org.apache.flink.runtime.taskmanager.Task       
>              
> [] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched 
> from INITIALIZING to RUNNING.
> Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 
> uncaughtException
> SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the 
> SynchronizationContext. Panic!
> java.lang.IllegalStateException: Could not find policy 'pick_first'. Make 
> sure its implementation is either registered to LoadBalancerRegistry or 
> included in META-INF/services/io.grpc.LoadBalancerProvider from your jar 
> files.
>     at 
> io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.(AutoConfiguredLoadBalancerFactory.java:94)
>     at 
> io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65)
>     at 
> io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375)
>     at 
> io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469)
>     at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
>     at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
>     at 
> io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473)
>     at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253)
>     at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210)
>     at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32)
>     at 
> com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94)
>     at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314)
>     at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288)
>     at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200)
>     at 
> com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
>     at 
> com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65)
>     at 
> com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
>     at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86)
>     at 
> com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
>     at 
> com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
>     at 
> com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82)
>     at 
> com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
>     at 
> com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
>     at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
>     at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425)
>     at 
> com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471)
>     at 
> com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399)
>     at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88)
>     at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

[jira] [Resolved] (FLINK-27395) IllegalStateException: Could not find policy 'pick_first'. on Flink Application

2022-06-13 Thread Natan HP (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Natan HP resolved FLINK-27395.
--
Resolution: Resolved

> IllegalStateException: Could not find policy 'pick_first'. on Flink 
> Application
> ---
>
> Key: FLINK-27395
> URL: https://issues.apache.org/jira/browse/FLINK-27395
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Google Cloud PubSub
>Affects Versions: 1.15.0, 1.14.2, 1.14.4
> Environment: # Minikube
> {noformat}
> ➜ minikube version
> minikube version: v1.25.2
> commit: 362d5fdc0a3dbee389b3d3f1034e8023e72bd3a7
> {noformat}
>  # Apache Flink Docker Images
> {noformat}
> apache/flink:1.14.4-scala_2.11{noformat}
> {noformat}
> apache/flink:1.15.0-scala_2.12-java11{noformat}
>Reporter: Natan HP
>Priority: Major
>
> I got this exception on flink taskmanager, but I can see that the data is 
> successfully published in the pub sub. Here is the log:
>  
> {noformat}
> 2022-04-25 07:53:44,293 INFO  org.apache.flink.runtime.taskmanager.Task       
>              
> [] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched 
> from INITIALIZING to RUNNING.
> Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 
> uncaughtException
> SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the 
> SynchronizationContext. Panic!
> java.lang.IllegalStateException: Could not find policy 'pick_first'. Make 
> sure its implementation is either registered to LoadBalancerRegistry or 
> included in META-INF/services/io.grpc.LoadBalancerProvider from your jar 
> files.
>     at 
> io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.(AutoConfiguredLoadBalancerFactory.java:94)
>     at 
> io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65)
>     at 
> io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375)
>     at 
> io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469)
>     at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
>     at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
>     at 
> io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473)
>     at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253)
>     at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210)
>     at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32)
>     at 
> com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94)
>     at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314)
>     at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288)
>     at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200)
>     at 
> com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
>     at 
> com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65)
>     at 
> com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
>     at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86)
>     at 
> com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
>     at 
> com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
>     at 
> com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82)
>     at 
> com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
>     at 
> com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
>     at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
>     at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425)
>     at 
> com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471)
>     at 
> com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399)
>     at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88)
>     at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(

[GitHub] [flink] zhuzhurk commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits

2022-06-13 Thread GitBox


zhuzhurk commented on code in PR #19946:
URL: https://github.com/apache/flink/pull/19946#discussion_r896335558


##
flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java:
##
@@ -160,6 +160,44 @@ public void testGetStatisticsMultiplePaths() throws 
IOException {
 stats.getTotalInputSize());
 }
 
+@Test
+public void testCreateInputSplitsWithEmptySplit() throws IOException {
+// create temporary file with 3 blocks
+final File tempFile = File.createTempFile("binary_input_format_test", 
"tmp");
+tempFile.deleteOnExit();

Review Comment:
   The `tempFile` can be deleted only if the whole JVM exits. I would suggest 
to use `TemporaryFolder` which can be cleaned after the test method finishes. 
`FlinkUserCodeClassLoadersTest` can be an example to reference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27395) IllegalStateException: Could not find policy 'pick_first'. on Flink Application

2022-06-13 Thread Natan HP (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Natan HP updated FLINK-27395:
-
Description: 
I got this exception on flink taskmanager, but I can see that the data is 
successfully published in the pub sub. Here is the log:

 
{noformat}
2022-04-25 07:53:44,293 INFO  org.apache.flink.runtime.taskmanager.Task         
           
[] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched 
from INITIALIZING to RUNNING.
Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 uncaughtException
SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the 
SynchronizationContext. Panic!
java.lang.IllegalStateException: Could not find policy 'pick_first'. Make sure 
its implementation is either registered to LoadBalancerRegistry or included in 
META-INF/services/io.grpc.LoadBalancerProvider from your jar files.
    at 
io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.(AutoConfiguredLoadBalancerFactory.java:94)
    at 
io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65)
    at 
io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375)
    at 
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469)
    at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
    at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
    at 
io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473)
    at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253)
    at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210)
    at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32)
    at 
com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94)
    at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314)
    at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288)
    at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200)
    at 
com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58)
    at 
com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65)
    at 
com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64)
    at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86)
    at 
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63)
    at 
com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41)
    at 
com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82)
    at 
com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79)
    at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126)
    at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87)
    at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425)
    at 
com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471)
    at 
com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399)
    at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88)
    at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
{noformat}
 

 

The code sample:
{code:java}
SinkFunction pubsubSink = PubSubSink.newBuilder() 
.withSerializationSchema((SerializationSchema) s -> 
s.getBytes(StandardCharsets.UTF_8)) 
.withProjectName("") 
.withTopicName("")
.build(); 

dataStream.addSink(pubsubSink) 
.name("Pub-sub-sink");  {code}
 

I use Maven Assembly Plugin to create the uber JAR:
{noformat}

org.apache.maven.plugins
maven-assembly-plugin
2.6



org.example.flink.Main



jar-with-dependencies




make-assembly
package

single



{noformat}
 

The content of the JAR:
{noformat}
➜ jar tf MyApp.jar | grep io.grpc.LoadBalancerProvider       
io/grpc/LoadBalancerProvider$UnknownConfig.class
META-INF/services/io.grpc.Lo

[jira] [Comment Edited] (FLINK-28003) A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION'

2022-06-13 Thread Jing Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553870#comment-17553870
 ] 

Jing Zhang edited comment on FLINK-28003 at 6/14/22 3:14 AM:
-

As discussed with [~fsk119] offline, we would add a configure to disable sql 
complete in SqlClient. Besides, for '-f sqlFile', it makes sense to disable sql 
complete by default.


was (Author: qingru zhang):
As discussed with [~fsk119] offline, we add a configure to disable sql complete 
in SqlClient. Besides, for '-f sqlFile', it makes sense to disable sql complete 
by default.

> A unexpected replacement happened in SqlClient, for example, replace 'pos  ' 
> to 'POSITION'
> --
>
> Key: FLINK-28003
> URL: https://issues.apache.org/jira/browse/FLINK-28003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.15.0
>Reporter: Jing Zhang
>Assignee: Shengkai Fang
>Priority: Major
> Attachments: zj_test.sql
>
>
> When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql'
> {code:java}
> create table if not exists db.zj_test(
> pos                   int,
> rank_cmd              string
> )
> partitioned by (
> `p_date` string,
> `p_hourmin` string);
> INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = 
> '0100')
> SELECT
> pos ,
> rank_cmd
> FROM db.sourceT
> where p_date = '20220605' and p_hourmin = '0100'; {code}
> An error would be thrown out because the 'pos' field is changed to 
> 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something 
> wrong.
> The error could be reproduced using the attached file.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28003) A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION'

2022-06-13 Thread Jing Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553870#comment-17553870
 ] 

Jing Zhang commented on FLINK-28003:


As discussed with [~fsk119] offline, we add a configure to disable sql complete 
in SqlClient. Besides, for '-f sqlFile', it makes sense to disable sql complete 
by default.

> A unexpected replacement happened in SqlClient, for example, replace 'pos  ' 
> to 'POSITION'
> --
>
> Key: FLINK-28003
> URL: https://issues.apache.org/jira/browse/FLINK-28003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.15.0
>Reporter: Jing Zhang
>Assignee: Shengkai Fang
>Priority: Major
> Attachments: zj_test.sql
>
>
> When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql'
> {code:java}
> create table if not exists db.zj_test(
> pos                   int,
> rank_cmd              string
> )
> partitioned by (
> `p_date` string,
> `p_hourmin` string);
> INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = 
> '0100')
> SELECT
> pos ,
> rank_cmd
> FROM db.sourceT
> where p_date = '20220605' and p_hourmin = '0100'; {code}
> An error would be thrown out because the 'pos' field is changed to 
> 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something 
> wrong.
> The error could be reproduced using the attached file.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-28003) A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION'

2022-06-13 Thread Jing Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Zhang reassigned FLINK-28003:
--

Assignee: Shengkai Fang

> A unexpected replacement happened in SqlClient, for example, replace 'pos  ' 
> to 'POSITION'
> --
>
> Key: FLINK-28003
> URL: https://issues.apache.org/jira/browse/FLINK-28003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.15.0
>Reporter: Jing Zhang
>Assignee: Shengkai Fang
>Priority: Major
> Attachments: zj_test.sql
>
>
> When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql'
> {code:java}
> create table if not exists db.zj_test(
> pos                   int,
> rank_cmd              string
> )
> partitioned by (
> `p_date` string,
> `p_hourmin` string);
> INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = 
> '0100')
> SELECT
> pos ,
> rank_cmd
> FROM db.sourceT
> where p_date = '20220605' and p_hourmin = '0100'; {code}
> An error would be thrown out because the 'pos' field is changed to 
> 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something 
> wrong.
> The error could be reproduced using the attached file.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] lsyldliu commented on pull request #19860: [FLINK-27658][table] FlinkUserCodeClassLoader expose addURL method to allow to register jar dynamically

2022-06-13 Thread GitBox


lsyldliu commented on PR #19860:
URL: https://github.com/apache/flink/pull/19860#issuecomment-1154661119

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27944) IO metrics collision happens if a task has union inputs

2022-06-13 Thread Zhu Zhu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-27944:

Fix Version/s: 1.15.2
   (was: 1.15.1)

> IO metrics collision happens if a task has union inputs
> ---
>
> Key: FLINK-27944
> URL: https://issues.apache.org/jira/browse/FLINK-27944
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.15.0
>Reporter: Zhu Zhu
>Priority: Critical
> Fix For: 1.16.0, 1.15.2
>
>
> When a task has union inputs, some IO metrics(numBytesIn* and numBuffersIn*) 
> of the different inputs may collide and failed to be registered.
>  
> The problem can be reproduced with a simple job like:
> {code:java}
> DataStream source1 = env.fromElements("abc");
> DataStream source2 = env.fromElements("123");
> source1.union(source2).print();{code}
>  
> Logs of collisions:
> {code:java}
> 2022-06-08 00:59:01,629 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numBytesInLocal'. Metric will not be reported.[, taskmanager, 
> fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, 
> Shuffle, Netty, Input]
> 2022-06-08 00:59:01,629 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numBytesInLocalPerSecond'. Metric will not be reported.[, taskmanager, 
> fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, 
> Shuffle, Netty, Input]
> 2022-06-08 00:59:01,629 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numBytesInLocal'. Metric will not be reported.[, taskmanager, 
> fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0]
> 2022-06-08 00:59:01,629 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numBytesInLocalPerSecond'. Metric will not be reported.[, taskmanager, 
> fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0]
> 2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numBytesInRemote'. Metric will not be reported.[, taskmanager, 
> fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, 
> Shuffle, Netty, Input]
> 2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numBytesInRemotePerSecond'. Metric will not be reported.[, taskmanager, 
> fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, 
> Shuffle, Netty, Input]
> 2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numBytesInRemote'. Metric will not be reported.[, taskmanager, 
> fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0]
> 2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numBytesInRemotePerSecond'. Metric will not be reported.[, taskmanager, 
> fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0]
> 2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numBuffersInLocal'. Metric will not be reported.[, taskmanager, 
> fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, 
> Shuffle, Netty, Input]
> 2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numBuffersInLocalPerSecond'. Metric will not be reported.[, 
> taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to 
> Std. Out, 0, Shuffle, Netty, Input]
> 2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numBuffersInLocal'. Metric will not be reported.[, taskmanager, 
> fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0]
> 2022-06-08 00:59:01,630 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numBuffersInLocalPerSecond'. Metric will not be reported.[, 
> taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, W

[GitHub] [flink] PatrickRen commented on pull request #19828: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment

2022-06-13 Thread GitBox


PatrickRen commented on PR #19828:
URL: https://github.com/apache/flink/pull/19828#issuecomment-1154653485

   @becketqin @leonardBang Please take a look when you are available. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28035) Don't check num of buckets for rescale bucket condition

2022-06-13 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-28035:
--
Summary: Don't check num of buckets for rescale bucket condition  (was: 
Enable scan reading different bucket num for rescale bucket)

> Don't check num of buckets for rescale bucket condition
> ---
>
> Key: FLINK-28035
> URL: https://issues.apache.org/jira/browse/FLINK-28035
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table Store
>Affects Versions: table-store-0.2.0
>Reporter: Jane Chan
>Priority: Major
> Fix For: table-store-0.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] PatrickRen commented on pull request #19828: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment

2022-06-13 Thread GitBox


PatrickRen commented on PR #19828:
URL: https://github.com/apache/flink/pull/19828#issuecomment-1154652936

   Thanks for the review @zou-can ! Initially I was trying to fix this case 
from the root that `KafkaPartitionSplitReader#wakeup` should only wake up the 
blocking `KafkaConsumer#poll` invocation, but I realized that it's not possible 
to do so because Kafka consumer doesn't expose such an API. I have updated my 
code to add a wrapper on Kafka consumer calls that catch `WakeupException` and 
retry on exception.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-28003) A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION'

2022-06-13 Thread Jing Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553860#comment-17553860
 ] 

Jing Zhang edited comment on FLINK-28003 at 6/14/22 2:51 AM:
-

[~martijnvisser] Thanks for response, however you might misunderstand here.

I don't use reserved keyword, I use 'pos', but SqlClient replaces it to

'position' which leads to an error.

This replacement causes a lot of problems.


was (Author: qingru zhang):
[~martijnvisser] Thanks for response, however you might misunderstand here.

I don't use reserved keyword, I use pos, but SqlClient replaces it to position 
which leads to an error.

This replacement causes a lot of problems.

> A unexpected replacement happened in SqlClient, for example, replace 'pos  ' 
> to 'POSITION'
> --
>
> Key: FLINK-28003
> URL: https://issues.apache.org/jira/browse/FLINK-28003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.15.0
>Reporter: Jing Zhang
>Priority: Major
> Attachments: zj_test.sql
>
>
> When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql'
> {code:java}
> create table if not exists db.zj_test(
> pos                   int,
> rank_cmd              string
> )
> partitioned by (
> `p_date` string,
> `p_hourmin` string);
> INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = 
> '0100')
> SELECT
> pos ,
> rank_cmd
> FROM db.sourceT
> where p_date = '20220605' and p_hourmin = '0100'; {code}
> An error would be thrown out because the 'pos' field is changed to 
> 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something 
> wrong.
> The error could be reproduced using the attached file.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28035) Enable scan reading different bucket num for rescale bucket

2022-06-13 Thread Jane Chan (Jira)
Jane Chan created FLINK-28035:
-

 Summary: Enable scan reading different bucket num for rescale 
bucket
 Key: FLINK-28035
 URL: https://issues.apache.org/jira/browse/FLINK-28035
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.2.0
Reporter: Jane Chan
 Fix For: table-store-0.2.0






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28003) A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION'

2022-06-13 Thread Jing Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553860#comment-17553860
 ] 

Jing Zhang commented on FLINK-28003:


[~martijnvisser] Thanks for response, however you might misunderstand here.

I don't use reserved keyword, I use pos, but SqlClient replaces it to position 
which leads to an error.

This replacement causes a lot of problems.

> A unexpected replacement happened in SqlClient, for example, replace 'pos  ' 
> to 'POSITION'
> --
>
> Key: FLINK-28003
> URL: https://issues.apache.org/jira/browse/FLINK-28003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.15.0
>Reporter: Jing Zhang
>Priority: Major
> Attachments: zj_test.sql
>
>
> When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql'
> {code:java}
> create table if not exists db.zj_test(
> pos                   int,
> rank_cmd              string
> )
> partitioned by (
> `p_date` string,
> `p_hourmin` string);
> INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = 
> '0100')
> SELECT
> pos ,
> rank_cmd
> FROM db.sourceT
> where p_date = '20220605' and p_hourmin = '0100'; {code}
> An error would be thrown out because the 'pos' field is changed to 
> 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something 
> wrong.
> The error could be reproduced using the attached file.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-28003) A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION'

2022-06-13 Thread Jing Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Zhang updated FLINK-28003:
---
Summary: A unexpected replacement happened in SqlClient, for example, 
replace 'pos  ' to 'POSITION'  (was: 'pos  ' field would be updated to 
'POSITION' when use SqlClient)

> A unexpected replacement happened in SqlClient, for example, replace 'pos  ' 
> to 'POSITION'
> --
>
> Key: FLINK-28003
> URL: https://issues.apache.org/jira/browse/FLINK-28003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.15.0
>Reporter: Jing Zhang
>Priority: Major
> Attachments: zj_test.sql
>
>
> When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql'
> {code:java}
> create table if not exists db.zj_test(
> pos                   int,
> rank_cmd              string
> )
> partitioned by (
> `p_date` string,
> `p_hourmin` string);
> INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = 
> '0100')
> SELECT
> pos ,
> rank_cmd
> FROM db.sourceT
> where p_date = '20220605' and p_hourmin = '0100'; {code}
> An error would be thrown out because the 'pos' field is changed to 
> 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something 
> wrong.
> The error could be reproduced using the attached file.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Reopened] (FLINK-28003) 'pos ' field would be updated to 'POSITION' when use SqlClient

2022-06-13 Thread Jing Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Zhang reopened FLINK-28003:


> 'pos  ' field would be updated to 'POSITION' when use SqlClient
> ---
>
> Key: FLINK-28003
> URL: https://issues.apache.org/jira/browse/FLINK-28003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.15.0
>Reporter: Jing Zhang
>Priority: Major
> Attachments: zj_test.sql
>
>
> When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql'
> {code:java}
> create table if not exists db.zj_test(
> pos                   int,
> rank_cmd              string
> )
> partitioned by (
> `p_date` string,
> `p_hourmin` string);
> INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = 
> '0100')
> SELECT
> pos ,
> rank_cmd
> FROM db.sourceT
> where p_date = '20220605' and p_hourmin = '0100'; {code}
> An error would be thrown out because the 'pos' field is changed to 
> 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something 
> wrong.
> The error could be reproduced using the attached file.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27951) Translate the "Debugging Classloading" page into Chinese

2022-06-13 Thread Zili Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553859#comment-17553859
 ] 

Zili Sun commented on FLINK-27951:
--

[~martijnvisser] I have done the work and have created a pr, can you help to 
review plz, thanks!

> Translate the "Debugging Classloading" page into Chinese
> 
>
> Key: FLINK-27951
> URL: https://issues.apache.org/jira/browse/FLINK-27951
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation
>Reporter: Zili Sun
>Assignee: Zili Sun
>Priority: Minor
>  Labels: pull-request-available
>
> The page "Debugging Classloading" needs to be translated into Chinese.
> I'm willing to work on this issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28034) ClassCastException occurred in creating a checkpoint with merge windows

2022-06-13 Thread Takayuki Eimizu (Jira)
Takayuki Eimizu created FLINK-28034:
---

 Summary: ClassCastException occurred in creating a checkpoint with 
merge windows 
 Key: FLINK-28034
 URL: https://issues.apache.org/jira/browse/FLINK-28034
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Takayuki Eimizu


h1. Summary

In Flink 1.15.0, the combination of following functions always occur 
ClassCastException.
 - Session Window
 - Checkpoint
 - Keyed State

The following repository provides minimal source code that can combine these 
features to reproduce the exception.

[https://github.com/t-eimizu/flink-checkpoint-with-merging-window]

 
h1. Description
h2. How the Exception Occurred
 
In the process window function of the session window, we must use 
`context.globalState()`
instead of `context.windowState()`. If you use `context.windowState()` in this 
situation, Flink throws `UnsupportedOperationException`.
 
So we have to do following:
 
{code:java}
   stPreviousValue = context.globalState().getState(desc4PreviousValue); 
{code}
 
Then stPreviousValue will have the following fields:
||Field Name||Value||
|currentNamespace|VoidNamespace|
|namespaceSerializer|TimeWindow$serializer|
As a result, when flink create checkpoint on this job, ClassCastException 
occurs.
{code:java}
2022-06-14 11:04:57,212 INFO  
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - 
ProcessingData -> Sink: PrintData (1/1)#0 - asynchronous part of checkpoint 1 
could not be completed. java.util.concurrent.ExecutionException: 
java.lang.ClassCastException: class 
org.apache.flink.runtime.state.VoidNamespace cannot be cast to class 
org.apache.flink.streaming.api.windowing.windows.TimeWindow 
(org.apache.flink.runtime.state.VoidNamespace and 
org.apache.flink.streaming.api.windowing.windows.TimeWindow are in unnamed 
module of loader 'app') at 
java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?] at 
java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?] at 
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645)
 ~[flink-core-1.15.0.jar:1.15.0] at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
 ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
 ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
 [flink-streaming-java-1.15.0.jar:1.15.0] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?] at java.lang.Thread.run(Thread.java:834) [?:?] Caused by: 
java.lang.ClassCastException: class 
org.apache.flink.runtime.state.VoidNamespace cannot be cast to class 
org.apache.flink.streaming.api.windowing.windows.TimeWindow 
(org.apache.flink.runtime.state.VoidNamespace and 
org.apache.flink.streaming.api.windowing.windows.TimeWindow are in unnamed 
module of loader 'app') at 
org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer.serialize(TimeWindow.java:130)
 ~[flink-streaming-java-1.15.0.jar:1.15.0] at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:145)
 ~[flink-runtime-1.15.0.jar:1.15.0] at 
org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116)
 ~[flink-runtime-1.15.0.jar:1.15.0] at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38)
 ~[flink-runtime-1.15.0.jar:1.15.0] at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172)
 ~[flink-runtime-1.15.0.jar:1.15.0] at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
 ~[flink-runtime-1.15.0.jar:1.15.0] at 
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
 ~[flink-runtime-1.15.0.jar:1.15.0] at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
 ~[flink-runtime-1.15.0.jar:1.15.0] at 
java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] at 
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:642)
 ~[flink-core-1.15.0.jar:1.15.0] ... 6 more  {code}
h2.  workaround
Turn off the checkpoint function.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-28029) Checkpoint always hangs when running some jobs

2022-06-13 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-28029.

Resolution: Duplicate

> Checkpoint always hangs when running some jobs
> --
>
> Key: FLINK-28029
> URL: https://issues.apache.org/jira/browse/FLINK-28029
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.3
>Reporter: Pauli Gandhi
>Priority: Major
>
> We have noticed that Flink jobs hangs and eventually times out after 2 hours 
> every time at the first checkpoint after it completes 15/23 acknowledgments 
> (65%).  There is no cpu activity but yet there are number of tasks reporting 
> 100% back pressure.  It is peculiar to this job and slight modifications to 
> this job.  We have created many Flink jobs in the past and never encountered 
> the issue.  
> Here are the things we tried to narrow down the problem
>  * The job runs fine if checkpointing is disabled.
>  * Increasing the number of task managers and parallelism to 2 seems to help 
> the job complete.  However, it stalled again when we sent a larger data set.
>  * Increased taskmanager memory from 4 GB to 16 GB and cpu from 1 to 4 but 
> didn't help.
>  * Sometimes restarting the job manager helps but at other times not.
>  * Breaking up the job into smaller parts helps the job to finish.
>  * Analyzed the the thread dump and it appears all threads are either in 
> sleeping or wait state.
> Here are the environment details
>  * Flink version 1.14.3
>  * Running Kubernetes
>  * Using RocksDB state backend.
>  * Checkpoint storage is S3 storage using the Presto library
>  * Exactly Once Semantics with unaligned checkpoints enabled.
>  * Checkpoint timeout 2 hours
>  * Maximum concurrent checkpoints is 1
>  * Taskmanager CPU: 4, Slots: 1, Process Size: 12 GB
>  * Using Kafka for input and output
> I have attached the task manager logs, thread dump, and screen shots of the 
> job graph and stalled checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-28030) Checkpoint always hangs when running some jobs

2022-06-13 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-28030.

Resolution: Duplicate

> Checkpoint always hangs when running some jobs
> --
>
> Key: FLINK-28030
> URL: https://issues.apache.org/jira/browse/FLINK-28030
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.3
>Reporter: Pauli Gandhi
>Priority: Major
>
> We have noticed that Flink jobs hangs and eventually times out after 2 hours 
> every time at the first checkpoint after it completes 15/23 acknowledgments 
> (65%).  There is no cpu activity but yet there are number of tasks reporting 
> 100% back pressure.  It is peculiar to this job and slight modifications to 
> this job.  We have created many Flink jobs in the past and never encountered 
> the issue.  
> Here are the things we tried to narrow down the problem
>  * The job runs fine if checkpointing is disabled.
>  * Increasing the number of task managers and parallelism to 2 seems to help 
> the job complete.  However, it stalled again when we sent a larger data set.
>  * Increased taskmanager memory from 4 GB to 16 GB and cpu from 1 to 4 but 
> didn't help.
>  * Sometimes restarting the job manager helps but at other times not.
>  * Breaking up the job into smaller parts helps the job to finish.
>  * Analyzed the the thread dump and it appears all threads are either in 
> sleeping or wait state.
> Here are the environment details
>  * Flink version 1.14.3
>  * Running Kubernetes
>  * Using RocksDB state backend.
>  * Checkpoint storage is S3 storage using the Presto library
>  * Exactly Once Semantics with unaligned checkpoints enabled.
>  * Checkpoint timeout 2 hours
>  * Maximum concurrent checkpoints is 1
>  * Taskmanager CPU: 4, Slots: 1, Process Size: 12 GB
>  * Using Kafka for input and output
> I have attached the task manager logs, thread dump, and screen shots of the 
> job graph and stalled checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-28031) Checkpoint always hangs when running some jobs

2022-06-13 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-28031.

Resolution: Duplicate

> Checkpoint always hangs when running some jobs
> --
>
> Key: FLINK-28031
> URL: https://issues.apache.org/jira/browse/FLINK-28031
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.3
>Reporter: Pauli Gandhi
>Priority: Major
>
> We have noticed that Flink jobs hangs and eventually times out after 2 hours 
> every time at the first checkpoint after it completes 15/23 acknowledgments 
> (65%).  There is no cpu activity but yet there are number of tasks reporting 
> 100% back pressure.  It is peculiar to this job and slight modifications to 
> this job.  We have created many Flink jobs in the past and never encountered 
> the issue.  
> Here are the things we tried to narrow down the problem
>  * The job runs fine if checkpointing is disabled.
>  * Increasing the number of task managers and parallelism to 2 seems to help 
> the job complete.  However, it stalled again when we sent a larger data set.
>  * Increased taskmanager memory from 4 GB to 16 GB and cpu from 1 to 4 but 
> didn't help.
>  * Sometimes restarting the job manager helps but at other times not.
>  * Breaking up the job into smaller parts helps the job to finish.
>  * Analyzed the the thread dump and it appears all threads are either in 
> sleeping or wait state.
> Here are the environment details
>  * Flink version 1.14.3
>  * Running Kubernetes
>  * Using RocksDB state backend.
>  * Checkpoint storage is S3 storage using the Presto library
>  * Exactly Once Semantics with unaligned checkpoints enabled.
>  * Checkpoint timeout 2 hours
>  * Maximum concurrent checkpoints is 1
>  * Taskmanager CPU: 4, Slots: 1, Process Size: 12 GB
>  * Using Kafka for input and output
> I have attached the task manager logs, thread dump, and screen shots of the 
> job graph and stalled checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] paul8263 commented on pull request #19772: [FLINK-27579][client] The param client.timeout can not be set by dyna…

2022-06-13 Thread GitBox


paul8263 commented on PR #19772:
URL: https://github.com/apache/flink/pull/19772#issuecomment-1154618703

   Hi @wangyang0918 ,
   Could you please review this PR? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel

2022-06-13 Thread YeAble (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553848#comment-17553848
 ] 

YeAble commented on FLINK-28033:


In my mind,it can do like this:
{code:java}
long newMinWatermark = Long.MAX_VALUE;
boolean hasAlignedChannels = false;

// determine new overall watermark by considering only watermark-aligned 
channels across all
// channels
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned  
&& channelStatus.watermark != Long.MIN_VALUE) {
hasAlignedChannels = true;
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}

// we acknowledge and output the new overall watermark if it really is 
aggregated
// from some remaining aligned channel, and is also larger than the last output 
watermark
if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
output.emitWatermark(new Watermark(lastOutputWatermark));
}
{code}
I'm not sure is right, so i report  this question in Jira and find any help.

> find and output new min watermark mybe wrong when in multichannel
> -
>
> Key: FLINK-28033
> URL: https://issues.apache.org/jira/browse/FLINK-28033
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Reporter: YeAble
>Priority: Blocker
>
> File: StatusWatermarkValue.java
> Method:  findAndOutputNewMinWatermarkAcrossAlignedChannels
> {code:java}
> //代码占位符
> long newMinWatermark = Long.MAX_VALUE;
> boolean hasAlignedChannels = false;
> // determine new overall watermark by considering only watermark-aligned 
> channels across all
> // channels
> for (InputChannelStatus channelStatus : channelStatuses) {
> if (channelStatus.isWatermarkAligned) {
> hasAlignedChannels = true;
> newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
> }
> }
> // we acknowledge and output the new overall watermark if it really is 
> aggregated
> // from some remaining aligned channel, and is also larger than the last 
> output watermark
> if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
> lastOutputWatermark = newMinWatermark;
> output.emitWatermark(new Watermark(lastOutputWatermark));
> } {code}
>  channelStatus's initalized watermark is Long.MIN_VALUE. when one 
> channelStatus's watermark is changed,but other channelStatus's is not 
> changed, the newMinWatermark is always Long.MIN_VALUE and output not 
> emitwatermark。 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] lsyldliu commented on pull request #19860: [FLINK-27658][table] FlinkUserCodeClassLoader expose addURL method to allow to register jar dynamically

2022-06-13 Thread GitBox


lsyldliu commented on PR #19860:
URL: https://github.com/apache/flink/pull/19860#issuecomment-1154606842

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel

2022-06-13 Thread YeAble (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YeAble updated FLINK-28033:
---
Priority: Blocker  (was: Major)

> find and output new min watermark mybe wrong when in multichannel
> -
>
> Key: FLINK-28033
> URL: https://issues.apache.org/jira/browse/FLINK-28033
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Reporter: YeAble
>Priority: Blocker
>
> File: StatusWatermarkValue.java
> Method:  findAndOutputNewMinWatermarkAcrossAlignedChannels
> {code:java}
> //代码占位符
> long newMinWatermark = Long.MAX_VALUE;
> boolean hasAlignedChannels = false;
> // determine new overall watermark by considering only watermark-aligned 
> channels across all
> // channels
> for (InputChannelStatus channelStatus : channelStatuses) {
> if (channelStatus.isWatermarkAligned) {
> hasAlignedChannels = true;
> newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
> }
> }
> // we acknowledge and output the new overall watermark if it really is 
> aggregated
> // from some remaining aligned channel, and is also larger than the last 
> output watermark
> if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
> lastOutputWatermark = newMinWatermark;
> output.emitWatermark(new Watermark(lastOutputWatermark));
> } {code}
>  channelStatus's initalized watermark is Long.MIN_VALUE. when one 
> channelStatus's watermark is changed,but other channelStatus's is not 
> changed, the newMinWatermark is always Long.MIN_VALUE and output not 
> emitwatermark。 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel

2022-06-13 Thread YeAble (Jira)
YeAble created FLINK-28033:
--

 Summary: find and output new min watermark mybe wrong when in 
multichannel
 Key: FLINK-28033
 URL: https://issues.apache.org/jira/browse/FLINK-28033
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.15.0
Reporter: YeAble


File: StatusWatermarkValue.java

Method:  findAndOutputNewMinWatermarkAcrossAlignedChannels
{code:java}
//代码占位符
long newMinWatermark = Long.MAX_VALUE;
boolean hasAlignedChannels = false;

// determine new overall watermark by considering only watermark-aligned 
channels across all
// channels
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned) {
hasAlignedChannels = true;
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}

// we acknowledge and output the new overall watermark if it really is 
aggregated
// from some remaining aligned channel, and is also larger than the last output 
watermark
if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
output.emitWatermark(new Watermark(lastOutputWatermark));
} {code}
 channelStatus's initalized watermark is Long.MIN_VALUE. when one 
channelStatus's watermark is changed,but other channelStatus's is not changed, 
the newMinWatermark is always Long.MIN_VALUE and output not emitwatermark。 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel

2022-06-13 Thread YeAble (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YeAble updated FLINK-28033:
---
Affects Version/s: (was: 1.15.0)

> find and output new min watermark mybe wrong when in multichannel
> -
>
> Key: FLINK-28033
> URL: https://issues.apache.org/jira/browse/FLINK-28033
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Reporter: YeAble
>Priority: Major
>
> File: StatusWatermarkValue.java
> Method:  findAndOutputNewMinWatermarkAcrossAlignedChannels
> {code:java}
> //代码占位符
> long newMinWatermark = Long.MAX_VALUE;
> boolean hasAlignedChannels = false;
> // determine new overall watermark by considering only watermark-aligned 
> channels across all
> // channels
> for (InputChannelStatus channelStatus : channelStatuses) {
> if (channelStatus.isWatermarkAligned) {
> hasAlignedChannels = true;
> newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
> }
> }
> // we acknowledge and output the new overall watermark if it really is 
> aggregated
> // from some remaining aligned channel, and is also larger than the last 
> output watermark
> if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
> lastOutputWatermark = newMinWatermark;
> output.emitWatermark(new Watermark(lastOutputWatermark));
> } {code}
>  channelStatus's initalized watermark is Long.MIN_VALUE. when one 
> channelStatus's watermark is changed,but other channelStatus's is not 
> changed, the newMinWatermark is always Long.MIN_VALUE and output not 
> emitwatermark。 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


  1   2   3   4   >