[GitHub] [flink] zhoulii commented on pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits
zhoulii commented on PR #19946: URL: https://github.com/apache/flink/pull/19946#issuecomment-1154791356 > > Maybe add a hotfix commit "Rework BinaryInputFormatTest to be based on AssertJ" first. And then add the new test using AssertJ. > > same as @zhuzhurk's previous suggestion, you should put the hotfix in the first commit, and then commit your changes and new tests follow it. Sorry, I don't quite understand this suggestion. Do you mean I should start a new PR to Rework BinaryInputFormatTest first? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28028) Common secure credential protection mechanism in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-28028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553931#comment-17553931 ] Martijn Visser commented on FLINK-28028: A potential solution was brought forward on the mailing list a while ago, see https://lists.apache.org/thread/ljn994s6xlzhz09ssxmynzotbw1mvt7f > Common secure credential protection mechanism in Flink SQL > -- > > Key: FLINK-28028 > URL: https://issues.apache.org/jira/browse/FLINK-28028 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Runtime >Reporter: Jing Ge >Priority: Major > > Currently, the most common way to use credential is to use: > > CREATE TABLE mytable ( > ... > ) > WITH ( > 'connector' = 'kafka', > 'properties.bootstrap.servers' = '...:9092', > 'topic' = '...', > 'properties.ssl.keystore.password' = , > 'properties.ssl.keystore.location' = ..., > 'properties.ssl.truststore.password' = , > 'properties.ssl.truststore.location' = ..., > ); > The credential could then be read by calling SHOW CREATE TABLE . > We should provide a more strong way to protect the credential. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel
[ https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-28033: --- Description: File: StatusWatermarkValve.java Method: findAndOutputNewMinWatermarkAcrossAlignedChannels {code:java} //代码占位符 long newMinWatermark = Long.MAX_VALUE; boolean hasAlignedChannels = false; // determine new overall watermark by considering only watermark-aligned channels across all // channels for (InputChannelStatus channelStatus : channelStatuses) { if (channelStatus.isWatermarkAligned) { hasAlignedChannels = true; newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); } } // we acknowledge and output the new overall watermark if it really is aggregated // from some remaining aligned channel, and is also larger than the last output watermark if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { lastOutputWatermark = newMinWatermark; output.emitWatermark(new Watermark(lastOutputWatermark)); } {code} channelStatus's initalized watermark is Long.MIN_VALUE. when one channelStatus's watermark is changed,but other channelStatus's is not changed, the newMinWatermark is always Long.MIN_VALUE and output not emitwatermark。 was: File: StatusWatermarkValue.java Method: findAndOutputNewMinWatermarkAcrossAlignedChannels {code:java} //代码占位符 long newMinWatermark = Long.MAX_VALUE; boolean hasAlignedChannels = false; // determine new overall watermark by considering only watermark-aligned channels across all // channels for (InputChannelStatus channelStatus : channelStatuses) { if (channelStatus.isWatermarkAligned) { hasAlignedChannels = true; newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); } } // we acknowledge and output the new overall watermark if it really is aggregated // from some remaining aligned channel, and is also larger than the last output watermark if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { lastOutputWatermark = newMinWatermark; output.emitWatermark(new Watermark(lastOutputWatermark)); } {code} channelStatus's initalized watermark is Long.MIN_VALUE. when one channelStatus's watermark is changed,but other channelStatus's is not changed, the newMinWatermark is always Long.MIN_VALUE and output not emitwatermark。 > find and output new min watermark mybe wrong when in multichannel > - > > Key: FLINK-28033 > URL: https://issues.apache.org/jira/browse/FLINK-28033 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Reporter: YeAble >Priority: Major > > File: StatusWatermarkValve.java > Method: findAndOutputNewMinWatermarkAcrossAlignedChannels > {code:java} > //代码占位符 > long newMinWatermark = Long.MAX_VALUE; > boolean hasAlignedChannels = false; > // determine new overall watermark by considering only watermark-aligned > channels across all > // channels > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > hasAlignedChannels = true; > newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); > } > } > // we acknowledge and output the new overall watermark if it really is > aggregated > // from some remaining aligned channel, and is also larger than the last > output watermark > if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { > lastOutputWatermark = newMinWatermark; > output.emitWatermark(new Watermark(lastOutputWatermark)); > } {code} > channelStatus's initalized watermark is Long.MIN_VALUE. when one > channelStatus's watermark is changed,but other channelStatus's is not > changed, the newMinWatermark is always Long.MIN_VALUE and output not > emitwatermark。 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] shuiqiangchen commented on pull request #19140: [FLINK-25231][python]Update Pyflink to use the new type system
shuiqiangchen commented on PR #19140: URL: https://github.com/apache/flink/pull/19140#issuecomment-1154786882 @HuangXingBo Thank you for your comments. I have updated the PR according to your suggestions, please have look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel
[ https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-28033: --- Priority: Major (was: Blocker) > find and output new min watermark mybe wrong when in multichannel > - > > Key: FLINK-28033 > URL: https://issues.apache.org/jira/browse/FLINK-28033 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Reporter: YeAble >Priority: Major > > File: StatusWatermarkValue.java > Method: findAndOutputNewMinWatermarkAcrossAlignedChannels > {code:java} > //代码占位符 > long newMinWatermark = Long.MAX_VALUE; > boolean hasAlignedChannels = false; > // determine new overall watermark by considering only watermark-aligned > channels across all > // channels > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > hasAlignedChannels = true; > newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); > } > } > // we acknowledge and output the new overall watermark if it really is > aggregated > // from some remaining aligned channel, and is also larger than the last > output watermark > if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { > lastOutputWatermark = newMinWatermark; > output.emitWatermark(new Watermark(lastOutputWatermark)); > } {code} > channelStatus's initalized watermark is Long.MIN_VALUE. when one > channelStatus's watermark is changed,but other channelStatus's is not > changed, the newMinWatermark is always Long.MIN_VALUE and output not > emitwatermark。 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] reswqa commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits
reswqa commented on code in PR #19946: URL: https://github.com/apache/flink/pull/19946#discussion_r896438515 ## flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java: ## @@ -73,13 +76,21 @@ public void testCreateInputSplitsWithOneFile() throws IOException { FileInputSplit[] inputSplits = inputFormat.createInputSplits(numBlocks); -Assert.assertEquals("Returns requested numbers of splits.", numBlocks, inputSplits.length); -Assert.assertEquals( -"1. split has block size length.", blockSize, inputSplits[0].getLength()); -Assert.assertEquals( -"2. split has block size length.", blockSize, inputSplits[1].getLength()); -Assert.assertEquals( -"3. split has block size length.", blockSize, inputSplits[2].getLength()); +assertThat(inputSplits.length) Review Comment: The same problem of other tests also needs to be modified -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits
reswqa commented on code in PR #19946: URL: https://github.com/apache/flink/pull/19946#discussion_r896430817 ## flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java: ## @@ -154,17 +170,60 @@ public void testGetStatisticsMultiplePaths() throws IOException { inputFormat.setBlockSize(blockSize); BaseStatistics stats = inputFormat.getStatistics(null); -Assert.assertEquals( -"The file size statistics is wrong", -blockSize * (numBlocks1 + numBlocks2), -stats.getTotalInputSize()); + +assertThat(stats.getTotalInputSize()) +.as("The file size statistics is wrong") +.isEqualTo(blockSize * (numBlocks1 + numBlocks2)); +} + +@Test +public void testCreateInputSplitsWithEmptySplit() throws IOException { +final int blockInfoSize = new BlockInfo().getInfoSize(); +final int blockSize = blockInfoSize + 8; +final int numBlocks = 3; +final int minNumSplits = 5; + +// create temporary file with 3 blocks +final File tempFile = +createBinaryInputFile( +"test_create_input_splits_with_empty_split", blockSize, numBlocks); + +final Configuration config = new Configuration(); +config.setLong("input.block_size", blockSize + 10); + +final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); +inputFormat.setFilePath(tempFile.toURI().toString()); +inputFormat.setBlockSize(blockSize); + +inputFormat.configure(config); + +FileInputSplit[] inputSplits = inputFormat.createInputSplits(minNumSplits); + +assertThat(inputSplits.length) +.as("Returns requested numbers of splits.") +.isEqualTo(minNumSplits); + +assertThat(inputSplits[0].getLength()) +.as("1. split has block size length.") Review Comment: IMO, `as` is used to add description when this assert is not pass. The error message will be displayed in the following format: org.opentest4j.AssertionFailedError: [description in as()] So I think the `as` description in these tests should be more accurate, such as "1. split should/must has block size length." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits
reswqa commented on code in PR #19946: URL: https://github.com/apache/flink/pull/19946#discussion_r896430817 ## flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java: ## @@ -154,17 +170,60 @@ public void testGetStatisticsMultiplePaths() throws IOException { inputFormat.setBlockSize(blockSize); BaseStatistics stats = inputFormat.getStatistics(null); -Assert.assertEquals( -"The file size statistics is wrong", -blockSize * (numBlocks1 + numBlocks2), -stats.getTotalInputSize()); + +assertThat(stats.getTotalInputSize()) +.as("The file size statistics is wrong") +.isEqualTo(blockSize * (numBlocks1 + numBlocks2)); +} + +@Test +public void testCreateInputSplitsWithEmptySplit() throws IOException { +final int blockInfoSize = new BlockInfo().getInfoSize(); +final int blockSize = blockInfoSize + 8; +final int numBlocks = 3; +final int minNumSplits = 5; + +// create temporary file with 3 blocks +final File tempFile = +createBinaryInputFile( +"test_create_input_splits_with_empty_split", blockSize, numBlocks); + +final Configuration config = new Configuration(); +config.setLong("input.block_size", blockSize + 10); + +final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); +inputFormat.setFilePath(tempFile.toURI().toString()); +inputFormat.setBlockSize(blockSize); + +inputFormat.configure(config); + +FileInputSplit[] inputSplits = inputFormat.createInputSplits(minNumSplits); + +assertThat(inputSplits.length) +.as("Returns requested numbers of splits.") +.isEqualTo(minNumSplits); + +assertThat(inputSplits[0].getLength()) +.as("1. split has block size length.") Review Comment: IMO, `as` is used to add description when this assert is not pass. The error message will be displayed in the following format: `org.opentest4j.AssertionFailedError: [description in as()] ` So I think the `as` description in these tests should be more accurate, such as "1. split should/must has block size length." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28026) Add benchmark module for flink table store
[ https://issues.apache.org/jira/browse/FLINK-28026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553929#comment-17553929 ] Jingsong Lee commented on FLINK-28026: -- [~Aiden Gong] Thanks! > Add benchmark module for flink table store > -- > > Key: FLINK-28026 > URL: https://issues.apache.org/jira/browse/FLINK-28026 > Project: Flink > Issue Type: New Feature > Components: Table Store >Reporter: Aiden Gong >Assignee: Aiden Gong >Priority: Minor > Fix For: table-store-0.2.0 > > > Add benchmark module for flink table store. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (FLINK-28026) Add benchmark module for flink table store
[ https://issues.apache.org/jira/browse/FLINK-28026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-28026: Assignee: Aiden Gong > Add benchmark module for flink table store > -- > > Key: FLINK-28026 > URL: https://issues.apache.org/jira/browse/FLINK-28026 > Project: Flink > Issue Type: New Feature > Components: Table Store >Reporter: Aiden Gong >Assignee: Aiden Gong >Priority: Minor > Fix For: table-store-0.2.0 > > > Add benchmark module for flink table store. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-28032) Checkpointing hangs and times out with some jobs
[ https://issues.apache.org/jira/browse/FLINK-28032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-28032. Resolution: Information Provided The thread dump on taskmanager does not have a good format for people to understand. >From my knowledge, the checkpoint lock was not guarantee by the task thread >which leads to the job hangs. Flink's jira issues is not a place to ask user questions, and slack or user mailing list is a better place to ask, you can find the information in the community info: https://flink.apache.org/community.html > Checkpointing hangs and times out with some jobs > > > Key: FLINK-28032 > URL: https://issues.apache.org/jira/browse/FLINK-28032 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.3 > Environment: Here are the environment details > * Flink version 1.14.3 > * Running Kubernetes > * Using RocksDB state backend. > * Checkpoint storage is S3 storage using the Presto library > * Exactly Once Semantics with unaligned checkpoints enabled. > * Checkpoint timeout 2 hours > * Maximum concurrent checkpoints is 1 > * Taskmanager CPU: 4, Slots: 1, Process Size: 12 GB > * Using Kafka for input and output >Reporter: Pauli Gandhi >Priority: Major > Attachments: checkpoint snapshot.png, jobgraph.png, > taskmanager_10.112.55.143_6122-969889_log, > taskmanager_10.112.55.143_6122-969889_thread_dump > > > We have noticed that Flink jobs hangs and eventually times out after 2 hours > every time at the first checkpoint after it completes 15/23(65%) > acknowledgments. There is no cpu/record processing activity but yet there > are a number of tasks reporting 100% back pressure. It is peculiar to this > job and slight modifications to this job. We have created many Flink jobs in > the past and never encountered the issue. > Here are the things we tried to narrow down the problem > * The job runs fine if checkpointing is disabled. > * Increasing the number of task managers and parallelism to 2 seems to help > the job complete. However, it stalled again when we sent a larger data set. > * Increased taskmanager memory from 4 GB to 16 GB and cpu from 1 to 4 but > didn't help. > * Sometimes restarting the job manager helps but at other times not. > * Breaking up the job into smaller parts helps the job to finish. > * Analyzed the the thread dump and it appears all threads are either in > sleeping or wait state. > I have attached the task manager logs (including debug logs for > checkpointing), thread dump, and screen shots of the job graph and stalled > checkpoint. > Your help in resolving this issue is greatly appreciated. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28037) Flink SQL Upsert-Kafka can not support Flink1.14.x
[ https://issues.apache.org/jira/browse/FLINK-28037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553927#comment-17553927 ] Martijn Visser commented on FLINK-28037: [~renqs] Can you have a look at this one? > Flink SQL Upsert-Kafka can not support Flink1.14.x > -- > > Key: FLINK-28037 > URL: https://issues.apache.org/jira/browse/FLINK-28037 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0, 1.14.2, 1.14.3, 1.14.4 > Environment: Flink Version: 1.14.0 1.14.2 1.14.3 1.14.4 >Reporter: Jiangfei Liu >Priority: Major > Labels: flink-connector-kafka, upser-kafka > Attachments: kafka-sql.png, kafka-sql2.png > > > in Flink 1.14.x,flink sql upsert-kafka sink can not write data into kafka > topic with sink buffer flush config,eg > h5. sink.buffer-flush.max-rows > h5. sink.buffer-flush.interval > in Flink1.13.x,flink sql upsert-kafka sink can write data into kafka topic > with sink buffer lush config -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28037) Flink SQL Upsert-Kafka can not support Flink1.14.x
[ https://issues.apache.org/jira/browse/FLINK-28037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-28037: --- Affects Version/s: (was: 1.14.0) (was: 1.14.2) (was: 1.14.3) > Flink SQL Upsert-Kafka can not support Flink1.14.x > -- > > Key: FLINK-28037 > URL: https://issues.apache.org/jira/browse/FLINK-28037 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4 > Environment: Flink Version: 1.14.0 1.14.2 1.14.3 1.14.4 >Reporter: Jiangfei Liu >Priority: Major > Labels: flink-connector-kafka, upser-kafka > Attachments: kafka-sql.png, kafka-sql2.png > > > in Flink 1.14.x,flink sql upsert-kafka sink can not write data into kafka > topic with sink buffer flush config,eg > h5. sink.buffer-flush.max-rows > h5. sink.buffer-flush.interval > in Flink1.13.x,flink sql upsert-kafka sink can write data into kafka topic > with sink buffer lush config -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] reswqa commented on pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits
reswqa commented on PR #19946: URL: https://github.com/apache/flink/pull/19946#issuecomment-1154783029 > Maybe add a hotfix commit "Rework BinaryInputFormatTest to be based on AssertJ" first. And then add the new test using AssertJ. same as @zhuzhurk's previous suggestion, you should put the hotfix in the first commit, and then commit your changes and new tests follow it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog
dusukang commented on PR #19741: URL: https://github.com/apache/flink/pull/19741#issuecomment-1154782595 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits
reswqa commented on code in PR #19946: URL: https://github.com/apache/flink/pull/19946#discussion_r896422901 ## flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java: ## @@ -73,13 +76,21 @@ public void testCreateInputSplitsWithOneFile() throws IOException { FileInputSplit[] inputSplits = inputFormat.createInputSplits(numBlocks); -Assert.assertEquals("Returns requested numbers of splits.", numBlocks, inputSplits.length); -Assert.assertEquals( -"1. split has block size length.", blockSize, inputSplits[0].getLength()); -Assert.assertEquals( -"2. split has block size length.", blockSize, inputSplits[1].getLength()); -Assert.assertEquals( -"3. split has block size length.", blockSize, inputSplits[2].getLength()); +assertThat(inputSplits.length) Review Comment: ```suggestion assertThat(inputSplits) .as("Returns requested numbers of splits.") .hasSize(numBlocks); ``` ## flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java: ## @@ -154,17 +170,60 @@ public void testGetStatisticsMultiplePaths() throws IOException { inputFormat.setBlockSize(blockSize); BaseStatistics stats = inputFormat.getStatistics(null); -Assert.assertEquals( -"The file size statistics is wrong", -blockSize * (numBlocks1 + numBlocks2), -stats.getTotalInputSize()); + +assertThat(stats.getTotalInputSize()) +.as("The file size statistics is wrong") +.isEqualTo(blockSize * (numBlocks1 + numBlocks2)); +} + +@Test +public void testCreateInputSplitsWithEmptySplit() throws IOException { +final int blockInfoSize = new BlockInfo().getInfoSize(); +final int blockSize = blockInfoSize + 8; +final int numBlocks = 3; +final int minNumSplits = 5; + +// create temporary file with 3 blocks +final File tempFile = +createBinaryInputFile( +"test_create_input_splits_with_empty_split", blockSize, numBlocks); + +final Configuration config = new Configuration(); +config.setLong("input.block_size", blockSize + 10); + +final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); +inputFormat.setFilePath(tempFile.toURI().toString()); +inputFormat.setBlockSize(blockSize); + +inputFormat.configure(config); + +FileInputSplit[] inputSplits = inputFormat.createInputSplits(minNumSplits); + +assertThat(inputSplits.length) +.as("Returns requested numbers of splits.") +.isEqualTo(minNumSplits); + +assertThat(inputSplits[0].getLength()) +.as("1. split has block size length.") +.isEqualTo(blockSize); + +assertThat(inputSplits[1].getLength()) +.as("2. split has block size length.") +.isEqualTo(blockSize); + +assertThat(inputSplits[2].getLength()) +.as("3. split has block size length.") +.isEqualTo(blockSize); + +assertThat(inputSplits[3].getLength()).as("4. split has block size length.").isEqualTo(0); + +assertThat(inputSplits[4].getLength()).as("5. split has block size length.").isEqualTo(0); Review Comment: ```suggestion assertThat(inputSplits[4].getLength()).as("5. split should be an empty split.").isZero(); ``` ## flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java: ## @@ -154,17 +170,60 @@ public void testGetStatisticsMultiplePaths() throws IOException { inputFormat.setBlockSize(blockSize); BaseStatistics stats = inputFormat.getStatistics(null); -Assert.assertEquals( -"The file size statistics is wrong", -blockSize * (numBlocks1 + numBlocks2), -stats.getTotalInputSize()); + +assertThat(stats.getTotalInputSize()) +.as("The file size statistics is wrong") +.isEqualTo(blockSize * (numBlocks1 + numBlocks2)); +} + +@Test +public void testCreateInputSplitsWithEmptySplit() throws IOException { +final int blockInfoSize = new BlockInfo().getInfoSize(); +final int blockSize = blockInfoSize + 8; +final int numBlocks = 3; +final int minNumSplits = 5; + +// create temporary file with 3 blocks +final File tempFile = +createBinaryInputFile( +"test_create_input_splits_with_empty_split", blockSize, numBlocks); + +final Configuration config = new Configuration(); +config.setLong("input.block_size", blockSize + 10); + +final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); +
[GitHub] [flink] fsk119 commented on a diff in pull request #19823: [FLINK-27766][sql-gateway] Introduce the framework of the SqlGatewayService
fsk119 commented on code in PR #19823: URL: https://github.com/apache/flink/pull/19823#discussion_r896426974 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java: ## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.context; + +import org.apache.flink.client.ClientUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.PlannerFactoryUtil; +import org.apache.flink.table.gateway.common.endpoint.EndpointVersion; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.service.operation.OperationManager; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.util.TemporaryClassLoaderContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +/** + * Context describing a session, it's mainly used for user to open a new session in the backend. If + * client request to open a new session, the backend {@code Executor} will maintain the session + * context map util users close it. + */ +public class SessionContext { + +private static final Logger LOG = LoggerFactory.getLogger(SessionContext.class); + +private final SessionHandle sessionId; +private final EndpointVersion endpointVersion; + +// store all options and use Configuration to build SessionState and TableConfig. +private final Configuration sessionConf; +private final SessionState sessionState; +private final URLClassLoader userClassloader; + +private final OperationManager operationManager; + +private SessionContext( +SessionHandle sessionId, +EndpointVersion endpointVersion, +Configuration sessionConf, +URLClassLoader classLoader, +SessionState sessionState, +OperationManager operationManager) { +this.sessionId = sessionId; +this.endpointVersion = endpointVersion; +this.sessionConf = sessionConf; +this.userClassloader = classLoader; +this.sessionState = sessionState; +this.operationManager = operationManager; +} + +// +// Getter method +// + +public SessionHandle getSessionId() { +return this.sessionId; +} + +public Map getConfigMap() { +return sessionConf.toMap(); +} + +// +// Method to execute commands +// + +/** Close resources, e.g. catalogs. */ +public void close() { +try (TemporaryClassLoaderContext ignored = +TemporaryClassLoaderContext.of(userClassloader)) { Review Comment: I think we
[jira] [Resolved] (FLINK-28038) RocksDB rescaling improvement & rescaling benchmark - umbrella ticket
[ https://issues.apache.org/jira/browse/FLINK-28038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuan Mei resolved FLINK-28038. -- Resolution: Fixed > RocksDB rescaling improvement & rescaling benchmark - umbrella ticket > - > > Key: FLINK-28038 > URL: https://issues.apache.org/jira/browse/FLINK-28038 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Minor > Fix For: 1.16.0 > > > Placeholder umbrella ticket for RocksDB rescaling improvement. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (FLINK-28038) RocksDB rescaling improvement & rescaling benchmark - umbrella ticket
[ https://issues.apache.org/jira/browse/FLINK-28038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuan Mei reassigned FLINK-28038: Assignee: Yanfei Lei > RocksDB rescaling improvement & rescaling benchmark - umbrella ticket > - > > Key: FLINK-28038 > URL: https://issues.apache.org/jira/browse/FLINK-28038 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Minor > > Placeholder umbrella ticket for RocksDB rescaling improvement. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28038) RocksDB rescaling improvement & rescaling benchmark - umbrella ticket
[ https://issues.apache.org/jira/browse/FLINK-28038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuan Mei updated FLINK-28038: - Fix Version/s: 1.16.0 > RocksDB rescaling improvement & rescaling benchmark - umbrella ticket > - > > Key: FLINK-28038 > URL: https://issues.apache.org/jira/browse/FLINK-28038 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.16.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Minor > Fix For: 1.16.0 > > > Placeholder umbrella ticket for RocksDB rescaling improvement. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] fsk119 commented on a diff in pull request #19823: [FLINK-27766][sql-gateway] Introduce the framework of the SqlGatewayService
fsk119 commented on code in PR #19823: URL: https://github.com/apache/flink/pull/19823#discussion_r896425180 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java: ## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.session; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.gateway.common.session.SessionEnvironment; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.DefaultContext; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.utils.Constants; +import org.apache.flink.table.gateway.service.utils.ThreadUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN; + +/** Manage the lifecycle of the {@code Session}. */ +public class SessionManager { + +private static final Logger LOG = LoggerFactory.getLogger(SessionManager.class); + +private final DefaultContext defaultContext; + +private final long idleTimeout; +private final long checkInterval; +private final int maxSessionCount; + +private final Map sessions; + +private ExecutorService operationExecutorService; +private ScheduledExecutorService scheduledExecutorService; +private ScheduledFuture timeoutCheckerFuture; + +public SessionManager(DefaultContext defaultContext) { +this.defaultContext = defaultContext; +ReadableConfig conf = defaultContext.getFlinkConfig(); +this.idleTimeout = conf.get(SQL_GATEWAY_SESSION_IDLE_TIMEOUT).toMillis(); +this.checkInterval = conf.get(SQL_GATEWAY_SESSION_CHECK_INTERVAL).toMillis(); +this.maxSessionCount = conf.get(SQL_GATEWAY_SESSION_MAX_NUM); +this.sessions = new ConcurrentHashMap<>(); +} + +public void start() { +if (checkInterval > 0 && idleTimeout > 0) { +scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); +timeoutCheckerFuture = +scheduledExecutorService.scheduleAtFixedRate( +() -> { +LOG.debug( +"Start to cleanup expired sessions, current session count: {}", +sessions.size()); +for (Map.Entry entry : +sessions.entrySet()) { +SessionHandle sessionId = entry.getKey(); +Session session = entry.getValue(); +if (isSessionExpired(session)) { +LOG.info("Session {} is expired, close it...", sessionId); +closeSession(session); +
[jira] [Commented] (FLINK-28035) Don't check num of buckets for rescale bucket condition
[ https://issues.apache.org/jira/browse/FLINK-28035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553917#comment-17553917 ] Jingsong Lee commented on FLINK-28035: -- Can you add some description? > Don't check num of buckets for rescale bucket condition > --- > > Key: FLINK-28035 > URL: https://issues.apache.org/jira/browse/FLINK-28035 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.2.0 >Reporter: Jane Chan >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.2.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28035) Don't check num of buckets for rescale bucket condition
[ https://issues.apache.org/jira/browse/FLINK-28035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28035: --- Labels: pull-request-available (was: ) > Don't check num of buckets for rescale bucket condition > --- > > Key: FLINK-28035 > URL: https://issues.apache.org/jira/browse/FLINK-28035 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.2.0 >Reporter: Jane Chan >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.2.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-table-store] JingsongLi commented on pull request #157: [FLINK-28035] Don't check num of buckets for rescale bucket condition
JingsongLi commented on PR #157: URL: https://github.com/apache/flink-table-store/pull/157#issuecomment-1154769567 Why just check for rescale? Throws exception for normal reading? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28040) Introduce Trino reader for table store
Jingsong Lee created FLINK-28040: Summary: Introduce Trino reader for table store Key: FLINK-28040 URL: https://issues.apache.org/jira/browse/FLINK-28040 Project: Flink Issue Type: Sub-task Components: Table Store Reporter: Jingsong Lee Fix For: table-store-0.2.0 Can refer to FLINK-27947 to write a Trino reader. See https://trino.io/ -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] zhoulii commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits
zhoulii commented on code in PR #19946: URL: https://github.com/apache/flink/pull/19946#discussion_r896417184 ## flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java: ## @@ -160,11 +161,45 @@ public void testGetStatisticsMultiplePaths() throws IOException { stats.getTotalInputSize()); } +@Test +public void testCreateInputSplitsWithEmptySplit() throws IOException { +final int blockInfoSize = new BlockInfo().getInfoSize(); +final int blockSize = blockInfoSize + 8; +final int numBlocks = 3; +final int minNumSplits = 5; + +// create temporary file with 3 blocks +final File tempFile = +createBinaryInputFile( +"test_create_input_splits_with_empty_split", blockSize, numBlocks); + +final Configuration config = new Configuration(); +config.setLong("input.block_size", blockSize + 10); + +final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); +inputFormat.setFilePath(tempFile.toURI().toString()); +inputFormat.setBlockSize(blockSize); + +inputFormat.configure(config); + +FileInputSplit[] inputSplits = inputFormat.createInputSplits(minNumSplits); + +Assert.assertEquals( Review Comment: Thanks for reviewing, the suggestion is very helpful. I have reworked BinaryInputFormatTest to be based on AssertJ, can you take a look when you are free? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27947) Introduce Spark Reader for table store
[ https://issues.apache.org/jira/browse/FLINK-27947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-27947: - Parent: FLINK-28039 Issue Type: Sub-task (was: New Feature) > Introduce Spark Reader for table store > -- > > Key: FLINK-27947 > URL: https://issues.apache.org/jira/browse/FLINK-27947 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.2.0 > > > Now that we have a more stable connector interface, we can develop a bit more > ecology. > Apache Spark is a common batch computing engine, and the more common > scenarios are: Flink Streaming writes storage, Spark reads storage. > So we can support Spark's reader. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-23399) Add a performance benchmark for statebackend rescaling
[ https://issues.apache.org/jira/browse/FLINK-23399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei updated FLINK-23399: --- Parent: FLINK-28038 Issue Type: Sub-task (was: Improvement) > Add a performance benchmark for statebackend rescaling > -- > > Key: FLINK-23399 > URL: https://issues.apache.org/jira/browse/FLINK-23399 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Affects Versions: 1.14.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Minor > Labels: pull-request-available, stale-assigned > Fix For: 1.16.0 > > > We notice that rescaling is not covered in the current state benchmark, so > we'd like to introduce a benchmark to test performance of state backend > restore durign rescaling in flink-benchmark. > The benchmark process is: > (1) generate some states, > (2) change the parallelism of the operator, and restore from these states > generate before. > The implementation of this benchmark is based on > {{RocksIncrementalCheckpointRescalingTest}}, and *_AverageTime_* is used to > measure the rescaling performance on each subtask. > > And this benchmark does not conflict with > `RocksIncrementalCheckpointRescalingBenchmarkTest` in > PR([#14893|https://github.com/apache/flink/pull/14893]). Compare with > `RocksIncrementalCheckpointRescalingBenchmarkTest`, this benchmark supports > testing rescaling on different state backends, and has a finer granularity. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28039) Flink Table Store Ecosystem: Compute Engine Readers
Jingsong Lee created FLINK-28039: Summary: Flink Table Store Ecosystem: Compute Engine Readers Key: FLINK-28039 URL: https://issues.apache.org/jira/browse/FLINK-28039 Project: Flink Issue Type: New Feature Components: Table Store Reporter: Jingsong Lee Fix For: table-store-0.2.0 After some refactor, we have a stable connector interfaces, we can develop compute engine connectors in a controlled cost. The most classic scenario is that write by Flink and read by other engines. We can have Readers for Apache Hive, Apache Spark and Trino. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-26560) Make the threshold of the overlap fraction of incremental restoring configurable
[ https://issues.apache.org/jira/browse/FLINK-26560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanfei Lei updated FLINK-26560: --- Parent: FLINK-28038 Issue Type: Sub-task (was: Improvement) > Make the threshold of the overlap fraction of incremental restoring > configurable > > > Key: FLINK-26560 > URL: https://issues.apache.org/jira/browse/FLINK-26560 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Yanfei Lei >Assignee: Yanfei Lei >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently, the threshold of the overlap fraction of incremental restoring > `OVERLAP_FRACTION_THRESHOLD` is a hard-coded, fixed value. > > {code:java} > public class RocksDBIncrementalCheckpointUtils { > /** > * The threshold of the overlap fraction of the handle's key-group range > with target key-group > * range to be an initial handle. > */ > private static final double OVERLAP_FRACTION_THRESHOLD = 0.75; > ... > } {code} > > `OVERLAP_FRACTION_THRESHOLD` is used to control how to restore a state > handle, different thresholds can affect the performance of restoring. The > behavior of deletion in restoring has been changed after FLINK-21321, the old > threshold no longer fits the current situation. > To make it easier to modify the threshold according to different situations, > changing `OVERLAP_FRACTION_THRESHOLD` to be configurable is suggested. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] dianfu commented on a diff in pull request #19896: [FLINK-27932][python] align noWatermarks and withWatermarkAlignment API
dianfu commented on code in PR #19896: URL: https://github.com/apache/flink/pull/19896#discussion_r896402211 ## docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md: ## @@ -77,6 +77,19 @@ WatermarkStrategy }) ``` {{< /tab >}} +{{< tab "Python" >}} Review Comment: Could we also update the following page: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/built_in/ ## docs/content.zh/docs/dev/datastream/event-time/generating_watermarks.md: ## @@ -136,6 +149,26 @@ withTimestampsAndWatermarks .addSink(...) ``` {{< /tab >}} +{{< tab "Python" >}} +```python +env = StreamExecutionEnvironment.get_execution_environment() + +# currently read_file is not supported in PyFlink Review Comment: Could we create a ticket for this missing functionality? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28038) RocksDB rescaling improvement & rescaling benchmark - umbrella ticket
Yanfei Lei created FLINK-28038: -- Summary: RocksDB rescaling improvement & rescaling benchmark - umbrella ticket Key: FLINK-28038 URL: https://issues.apache.org/jira/browse/FLINK-28038 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.16.0 Reporter: Yanfei Lei Placeholder umbrella ticket for RocksDB rescaling improvement. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (FLINK-26941) Support Pattern end with notFollowedBy with window
[ https://issues.apache.org/jira/browse/FLINK-26941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-26941: --- Assignee: Yue Ma > Support Pattern end with notFollowedBy with window > -- > > Key: FLINK-26941 > URL: https://issues.apache.org/jira/browse/FLINK-26941 > Project: Flink > Issue Type: Improvement > Components: Library / CEP >Affects Versions: 1.14.4 >Reporter: Yue Ma >Assignee: Yue Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.14.4 > > > Currently a pattern sequence cannot end in notFollowedBy() in Flink CEP. But > in fact, this requirement exists in many scenarios. As mentioned in the > following tickets: > https://issues.apache.org/jira/browse/FLINK-16010 > https://issues.apache.org/jira/browse/FLINK-9431 > Unfortunately, these tickets are not active for a long time.But we still > think this is an important feature for Flink CEP, so we would like to share > our implementation. > If we want to find the users who created an order but didn't pay in 10 > minutes. We could code like this: > {code:java} > Pattern.begin('create').notFollowedBy('pay_order').withIn(10min){code} > If we receive the create event but don't receive the pay event within 10 > minutes, then the match will be successful. > The idea of implementation is basically the same as the design of FLINK-16010. > A Pending State is introduced to represent the state of waiting for a > timeout, and there is a take edge between the Pending node and the Stop node. > When advanceTime, if it is found that the pending node has timed out, then > extract the timeout sequence and output it normally as successed matched > sequence. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] reswqa commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType
reswqa commented on code in PR #19927: URL: https://github.com/apache/flink/pull/19927#discussion_r896405257 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -1201,6 +1243,33 @@ private void verifyFractions( delta); } +@Test +public void testSetNonDefaultSlotSharingInHybridMode() { +Configuration configuration = new Configuration(); +// set all edge to HYBRID result partition type. +configuration.set( +ExecutionOptions.BATCH_SHUFFLE_MODE, +BatchShuffleMode.EXPERIMENTAL_ALL_EXCHANGES_HYBRID); + +final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(configuration); +// specify slot sharing group for map1 +streamGraph.getStreamNodes().stream() +.filter(n -> "map1".equals(n.getOperatorName())) +.findFirst() +.get() +.setSlotSharingGroup("testSlotSharingGroup"); + +try { +StreamingJobGraphGenerator.createJobGraph(streamGraph); +fail("hybrid shuffle mode with non default slot sharing group should failed."); +} catch (IllegalStateException e) { +assertTrue( +e.getMessage() +.contains( +"hybrid shuffle mode currently does not support setting slot sharing group")); Review Comment: > This should use assertj's assertThatThrownby() nice suggestion, i have fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #19927: [FLINK-27903][runtime] Introduce and support HYBRID resultPartitionType
reswqa commented on code in PR #19927: URL: https://github.com/apache/flink/pull/19927#discussion_r896405257 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -1201,6 +1243,33 @@ private void verifyFractions( delta); } +@Test +public void testSetNonDefaultSlotSharingInHybridMode() { +Configuration configuration = new Configuration(); +// set all edge to HYBRID result partition type. +configuration.set( +ExecutionOptions.BATCH_SHUFFLE_MODE, +BatchShuffleMode.EXPERIMENTAL_ALL_EXCHANGES_HYBRID); + +final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(configuration); +// specify slot sharing group for map1 +streamGraph.getStreamNodes().stream() +.filter(n -> "map1".equals(n.getOperatorName())) +.findFirst() +.get() +.setSlotSharingGroup("testSlotSharingGroup"); + +try { +StreamingJobGraphGenerator.createJobGraph(streamGraph); +fail("hybrid shuffle mode with non default slot sharing group should failed."); +} catch (IllegalStateException e) { +assertTrue( +e.getMessage() +.contains( +"hybrid shuffle mode currently does not support setting slot sharing group")); Review Comment: > This should use assertj's assertThatThrownby() @zentol nice suggestion, i have fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits
zhuzhurk commented on code in PR #19946: URL: https://github.com/apache/flink/pull/19946#discussion_r896403096 ## flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java: ## @@ -160,11 +161,45 @@ public void testGetStatisticsMultiplePaths() throws IOException { stats.getTotalInputSize()); } +@Test +public void testCreateInputSplitsWithEmptySplit() throws IOException { +final int blockInfoSize = new BlockInfo().getInfoSize(); +final int blockSize = blockInfoSize + 8; +final int numBlocks = 3; +final int minNumSplits = 5; + +// create temporary file with 3 blocks +final File tempFile = +createBinaryInputFile( +"test_create_input_splits_with_empty_split", blockSize, numBlocks); + +final Configuration config = new Configuration(); +config.setLong("input.block_size", blockSize + 10); + +final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); +inputFormat.setFilePath(tempFile.toURI().toString()); +inputFormat.setBlockSize(blockSize); + +inputFormat.configure(config); + +FileInputSplit[] inputSplits = inputFormat.createInputSplits(minNumSplits); + +Assert.assertEquals( Review Comment: Maybe add a hotfix commit "Rework BinaryInputFormatTest to be based on AssertJ" first. And then add the new test using AssertJ. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits
zhuzhurk commented on code in PR #19946: URL: https://github.com/apache/flink/pull/19946#discussion_r896402127 ## flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java: ## @@ -160,11 +161,45 @@ public void testGetStatisticsMultiplePaths() throws IOException { stats.getTotalInputSize()); } +@Test +public void testCreateInputSplitsWithEmptySplit() throws IOException { +final int blockInfoSize = new BlockInfo().getInfoSize(); +final int blockSize = blockInfoSize + 8; +final int numBlocks = 3; +final int minNumSplits = 5; + +// create temporary file with 3 blocks +final File tempFile = +createBinaryInputFile( +"test_create_input_splits_with_empty_split", blockSize, numBlocks); + +final Configuration config = new Configuration(); +config.setLong("input.block_size", blockSize + 10); + +final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); +inputFormat.setFilePath(tempFile.toURI().toString()); +inputFormat.setBlockSize(blockSize); + +inputFormat.configure(config); + +FileInputSplit[] inputSplits = inputFormat.createInputSplits(minNumSplits); + +Assert.assertEquals( Review Comment: Makes sense. Ref: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe commented on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
godfreyhe commented on PR #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-1154742694 @libenchao, Hope you can continue to move forward -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Aitozi commented on pull request #19840: [FLINK-24713][Runtime/Coordination] Support the initial delay for SlotManager to wait fo…
Aitozi commented on PR #19840: URL: https://github.com/apache/flink/pull/19840#issuecomment-1154742585 Hi @KarmaGYZ , I have rebased this PR based on the previous commit, please take a look again, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on pull request #19882: [hotfix] fix a typo in NetworkBufferTest
reswqa commented on PR #19882: URL: https://github.com/apache/flink/pull/19882#issuecomment-1154728455 @wsry Just a very small hotfix, would you like to take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa closed pull request #14498: [hotfix][typo]Fix typo in StreamingJobGraphGenerator.areOperatorsChainable
reswqa closed pull request #14498: [hotfix][typo]Fix typo in StreamingJobGraphGenerator.areOperatorsChainable URL: https://github.com/apache/flink/pull/14498 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on pull request #14498: [hotfix][typo]Fix typo in StreamingJobGraphGenerator.areOperatorsChainable
reswqa commented on PR #14498: URL: https://github.com/apache/flink/pull/14498#issuecomment-1154727170 fixed by FLINK-27903 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #19950: [hotfix] [docs] Translate a Chinese comment into English
flinkbot commented on PR #19950: URL: https://github.com/apache/flink/pull/19950#issuecomment-1154726348 ## CI report: * 19abcb256444ba684736e355bf76986a79d5f501 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] diohabara opened a new pull request, #19950: [hotfix] [docs] Translate a Chinese comment into English
diohabara opened a new pull request, #19950: URL: https://github.com/apache/flink/pull/19950 I translate a Chinese comment in the sample code into English for more people to understand it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28037) Flink SQL Upsert-Kafka can not support Flink1.14.x
Jiangfei Liu created FLINK-28037: Summary: Flink SQL Upsert-Kafka can not support Flink1.14.x Key: FLINK-28037 URL: https://issues.apache.org/jira/browse/FLINK-28037 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.4, 1.14.3, 1.14.2, 1.14.0 Environment: Flink Version: 1.14.0 1.14.2 1.14.3 1.14.4 Reporter: Jiangfei Liu Attachments: kafka-sql.png, kafka-sql2.png in Flink 1.14.x,flink sql upsert-kafka sink can not write data into kafka topic with sink buffer flush config,eg h5. sink.buffer-flush.max-rows h5. sink.buffer-flush.interval in Flink1.13.x,flink sql upsert-kafka sink can write data into kafka topic with sink buffer lush config -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-15635) Allow passing a ClassLoader to EnvironmentSettings
[ https://issues.apache.org/jira/browse/FLINK-15635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-15635: Parent: FLINK-14055 Issue Type: Sub-task (was: Improvement) > Allow passing a ClassLoader to EnvironmentSettings > -- > > Key: FLINK-15635 > URL: https://issues.apache.org/jira/browse/FLINK-15635 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: Francesco Guardiani >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > We had a couple of class loading issues in the past because people forgot to > use the right classloader in {{flink-table}}. The SQL Client executor code > hacks a classloader into the planner process by using {{wrapClassLoader}} > that sets the threads context classloader. > Instead we should allow passing a class loader to environment settings. This > class loader can be passed to the planner and can be stored in table > environment, table config, etc. to have a consistent class loading behavior. > Having this in place should replace the need for > {{Thread.currentThread().getContextClassLoader()}} in the entire > {{flink-table}} module. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27828) FlinkKafkaProducer VS KafkaSink
[ https://issues.apache.org/jira/browse/FLINK-27828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553892#comment-17553892 ] Jiangfei Liu commented on FLINK-27828: -- checkpoint config: CheckpointConfig checkpointConfig = env.getCheckpointConfig(); env.enableCheckpointing(CHECKPOINT_INTERVAL); checkpointConfig.setCheckpointingMode(CHECKPOINT_MODE); checkpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT); checkpointConfig.setTolerableCheckpointFailureNumber(CHECKPOINT_FAILURE_NUMBER); env.setRestartStrategy(RESTART_STRATEGY_CONFIGURATION); checkpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT); checkpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_BETWEEN); checkpointConfig.setExternalizedCheckpointCleanup(CHECKPOINT_EXTERNALIZED_CLEANUP); checkpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage(HDFS_BASE + CHECKPOINT_BASE_PATH + path)); System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME); parallelism config: 3 > FlinkKafkaProducer VS KafkaSink > --- > > Key: FLINK-27828 > URL: https://issues.apache.org/jira/browse/FLINK-27828 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.14.3 >Reporter: Jiangfei Liu >Priority: Major > Attachments: Snipaste_2022-05-25_19-52-11.png > > > sorry,my english is bad. > in flink1.14.3,write 1 data to kafka. > when use FlinkKafkaProducer,completed 7s > when use KafkaSink,completed 1m40s > why KafkaSink is low speed? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27828) FlinkKafkaProducer VS KafkaSink
[ https://issues.apache.org/jira/browse/FLINK-27828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553891#comment-17553891 ] Jiangfei Liu commented on FLINK-27828: -- parallelism:3 checkpoint config: CheckpointConfig checkpointConfig = env.getCheckpointConfig(); env.enableCheckpointing(CHECKPOINT_INTERVAL); checkpointConfig.setCheckpointingMode(CHECKPOINT_MODE); checkpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT); checkpointConfig.setTolerableCheckpointFailureNumber(CHECKPOINT_FAILURE_NUMBER); env.setRestartStrategy(RESTART_STRATEGY_CONFIGURATION); checkpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT); checkpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_BETWEEN); checkpointConfig.setExternalizedCheckpointCleanup(CHECKPOINT_EXTERNALIZED_CLEANUP); checkpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage(HDFS_BASE + CHECKPOINT_BASE_PATH + path)); System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME); > FlinkKafkaProducer VS KafkaSink > --- > > Key: FLINK-27828 > URL: https://issues.apache.org/jira/browse/FLINK-27828 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.14.3 >Reporter: Jiangfei Liu >Priority: Major > Attachments: Snipaste_2022-05-25_19-52-11.png > > > sorry,my english is bad. > in flink1.14.3,write 1 data to kafka. > when use FlinkKafkaProducer,completed 7s > when use KafkaSink,completed 1m40s > why KafkaSink is low speed? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] (FLINK-27828) FlinkKafkaProducer VS KafkaSink
[ https://issues.apache.org/jira/browse/FLINK-27828 ] Jiangfei Liu deleted comment on FLINK-27828: -- was (Author: JIRAUSER290004): parallelism:3 checkpoint config: CheckpointConfig checkpointConfig = env.getCheckpointConfig(); env.enableCheckpointing(CHECKPOINT_INTERVAL); checkpointConfig.setCheckpointingMode(CHECKPOINT_MODE); checkpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT); checkpointConfig.setTolerableCheckpointFailureNumber(CHECKPOINT_FAILURE_NUMBER); env.setRestartStrategy(RESTART_STRATEGY_CONFIGURATION); checkpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT); checkpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_BETWEEN); checkpointConfig.setExternalizedCheckpointCleanup(CHECKPOINT_EXTERNALIZED_CLEANUP); checkpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage(HDFS_BASE + CHECKPOINT_BASE_PATH + path)); System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME); > FlinkKafkaProducer VS KafkaSink > --- > > Key: FLINK-27828 > URL: https://issues.apache.org/jira/browse/FLINK-27828 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.14.3 >Reporter: Jiangfei Liu >Priority: Major > Attachments: Snipaste_2022-05-25_19-52-11.png > > > sorry,my english is bad. > in flink1.14.3,write 1 data to kafka. > when use FlinkKafkaProducer,completed 7s > when use KafkaSink,completed 1m40s > why KafkaSink is low speed? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-26721) PulsarSourceITCase.testSavepoint failed on azure pipeline
[ https://issues.apache.org/jira/browse/FLINK-26721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553889#comment-17553889 ] godfrey he commented on FLINK-26721: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36637&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203 > PulsarSourceITCase.testSavepoint failed on azure pipeline > - > > Key: FLINK-26721 > URL: https://issues.apache.org/jira/browse/FLINK-26721 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.16.0 >Reporter: Yun Gao >Priority: Critical > Labels: build-stability > > {code:java} > Mar 18 05:49:52 [ERROR] Tests run: 12, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 315.581 s <<< FAILURE! - in > org.apache.flink.connector.pulsar.source.PulsarSourceITCase > Mar 18 05:49:52 [ERROR] > org.apache.flink.connector.pulsar.source.PulsarSourceITCase.testSavepoint(TestEnvironment, > DataStreamSourceExternalContext, CheckpointingMode)[1] Time elapsed: > 140.803 s <<< FAILURE! > Mar 18 05:49:52 java.lang.AssertionError: > Mar 18 05:49:52 > Mar 18 05:49:52 Expecting > Mar 18 05:49:52 > Mar 18 05:49:52 to be completed within 2M. > Mar 18 05:49:52 > Mar 18 05:49:52 exception caught while trying to get the future result: > java.util.concurrent.TimeoutException > Mar 18 05:49:52 at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > Mar 18 05:49:52 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) > Mar 18 05:49:52 at > org.assertj.core.internal.Futures.assertSucceededWithin(Futures.java:109) > Mar 18 05:49:52 at > org.assertj.core.api.AbstractCompletableFutureAssert.internalSucceedsWithin(AbstractCompletableFutureAssert.java:400) > Mar 18 05:49:52 at > org.assertj.core.api.AbstractCompletableFutureAssert.succeedsWithin(AbstractCompletableFutureAssert.java:396) > Mar 18 05:49:52 at > org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.checkResultWithSemantic(SourceTestSuiteBase.java:766) > Mar 18 05:49:52 at > org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.restartFromSavepoint(SourceTestSuiteBase.java:399) > Mar 18 05:49:52 at > org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase.testSavepoint(SourceTestSuiteBase.java:241) > Mar 18 05:49:52 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Mar 18 05:49:52 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Mar 18 05:49:52 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Mar 18 05:49:52 at java.lang.reflect.Method.invoke(Method.java:498) > Mar 18 05:49:52 at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) > Mar 18 05:49:52 at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > Mar 18 05:49:52 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > Mar 18 05:49:52 at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > Mar 18 05:49:52 at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > Mar 18 05:49:52 at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92) > Mar 18 05:49:52 at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > Mar 18 05:49:52 at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > Mar 18 05:49:52 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > Mar 18 05:49:52 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > Mar 18 05:49:52 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > Mar 18 05:49:52 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > Mar 18 05:49:52 at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > Mar 18 05:49:52 at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > Mar 18 05:49:52 at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$in
[GitHub] [flink] flinkbot commented on pull request #19949: [FLINK-28036][hive] HiveInspectors should use correct writable type to create ConstantObjectInspector
flinkbot commented on PR #19949: URL: https://github.com/apache/flink/pull/19949#issuecomment-1154694926 ## CI report: * b2f5199f5d430f4684e63f5f4d4317690c7f53e2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits
reswqa commented on code in PR #19946: URL: https://github.com/apache/flink/pull/19946#discussion_r896360739 ## flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java: ## @@ -160,11 +161,45 @@ public void testGetStatisticsMultiplePaths() throws IOException { stats.getTotalInputSize()); } +@Test +public void testCreateInputSplitsWithEmptySplit() throws IOException { +final int blockInfoSize = new BlockInfo().getInfoSize(); +final int blockSize = blockInfoSize + 8; +final int numBlocks = 3; +final int minNumSplits = 5; + +// create temporary file with 3 blocks +final File tempFile = +createBinaryInputFile( +"test_create_input_splits_with_empty_split", blockSize, numBlocks); + +final Configuration config = new Configuration(); +config.setLong("input.block_size", blockSize + 10); + +final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); +inputFormat.setFilePath(tempFile.toURI().toString()); +inputFormat.setBlockSize(blockSize); + +inputFormat.configure(config); + +FileInputSplit[] inputSplits = inputFormat.createInputSplits(minNumSplits); + +Assert.assertEquals( Review Comment: Are you willing to use assertj instead of junit.Assert,so that subsequent migration to junit5 will be more convenient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] luoyuxia opened a new pull request, #19949: [FLINK-28036][hive] HiveInspectors should use correct writable type to create ConstantObjectInspector
luoyuxia opened a new pull request, #19949: URL: https://github.com/apache/flink/pull/19949 ## What is the purpose of the change To make `HiveInspectors` use correct writable type to create ConstantObjectInspector ## Brief change log Replace `org.apache.hadoop.io.ByteWritable`, `org.apache.hadoop.io.ShortWritable`, `org.apache.hadoop.io.DoubleWritable` with `org.apache.hadoop.hive.serde2.io.ByteWritable`, `org.apache.hadoop.hive.serde2.io.ShortWritable`, `org.apache.hadoop.hive.serde2.io.DoubleWritable` when create `WritableConstantByteObjectInspector`, `WritableConstantShortObjectInspector`, `WritableConstantDoubleObjectInspector`. ## Verifying this change Existing test and verification manually. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? N/A -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28036) HiveInspectors should use correct writable type to creaet ConstantObjectInspector
[ https://issues.apache.org/jira/browse/FLINK-28036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28036: --- Labels: pull-request-available (was: ) > HiveInspectors should use correct writable type to creaet > ConstantObjectInspector > - > > Key: FLINK-28036 > URL: https://issues.apache.org/jira/browse/FLINK-28036 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > In HiveInspectors, we will create Hive's ConstantObjectInspector by passing > writable type such as ByteWritable, DoubleWritable, all of the writable class > are classes of package org.apache.hadoop.io. But the Hive's > ConstantObjectInspector may require different class such as > WritableConstantDoubleObjectInspector require class > org.apache.hadoop.hive.serde2.io.DoubleWritable, which is class of package > org.apache.hadoop.hive.serde2.io.DoubleWritable. Then when touch such code, > it will throw "no such method exception: > org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectinspector.(org.apache.hadoop.io.DoubleWritable). > > I found ByteWritable, ShortWritable, DoubleWritable should be the class in > package of `class org.apache.hadoop.hive.serde2.io` instead of > `org.apache.hadoop.io`. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] snuyanzin commented on pull request #18779: [hotfix][javadoc] Misprint in javadoc at QuadFunction.java
snuyanzin commented on PR #18779: URL: https://github.com/apache/flink/pull/18779#issuecomment-1154691101 @MartijnVisser sorry for the poke. Could you please have a look once you have time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-27658) FlinkUserCodeClassLoader expose addURL method to allow to register jar dynamically
[ https://issues.apache.org/jira/browse/FLINK-27658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu closed FLINK-27658. --- Assignee: dalongliu Resolution: Fixed Done via: 2e37a06368596ca9ed04366a879f2facfb7ef509 > FlinkUserCodeClassLoader expose addURL method to allow to register jar > dynamically > -- > > Key: FLINK-27658 > URL: https://issues.apache.org/jira/browse/FLINK-27658 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task, Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] zhuzhurk merged pull request #19860: [FLINK-27658][table] FlinkUserCodeClassLoader expose addURL method to allow to register jar dynamically
zhuzhurk merged PR #19860: URL: https://github.com/apache/flink/pull/19860 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #19849: [FLINK-27767][sql-gateway] Introduce Endpoint API and utils
reswqa commented on code in PR #19849: URL: https://github.com/apache/flink/pull/19849#discussion_r896357754 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerTest.java: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.session; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.cli.DefaultCLI; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions; +import org.apache.flink.table.gateway.common.session.SessionEnvironment; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.common.utils.MockedEndpointVersion; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.DefaultContext; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** Test for {@link SessionManager}. */ +public class SessionManagerTest { Review Comment: A small suggestion: I think we should now use Junit5 and assertj in newly introduced tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #19939: [FLINK-27983][FLINK-27984][FLINK-27985] Introduce SupportsStatisticsReport, FileBasedStatisticsReportableDecodingFormat, FlinkRecomp
reswqa commented on code in PR #19939: URL: https://github.com/apache/flink/pull/19939#discussion_r896353277 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java: ## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.table.planner.utils.TableTestUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelVisitor; +import org.apache.calcite.rel.core.TableScan; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** Test for statistics functionality in {@link FileSystemTableSource}. */ +public class FileSystemStatisticsReportTest extends TableTestBase { Review Comment: I think we should now use Junit5 and assertj in newly introduced tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #19939: [FLINK-27983][FLINK-27984][FLINK-27985] Introduce SupportsStatisticsReport, FileBasedStatisticsReportableDecodingFormat, FlinkRecomp
reswqa commented on code in PR #19939: URL: https://github.com/apache/flink/pull/19939#discussion_r896352923 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java: ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program; + +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsStatisticReport; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.PartitionPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle; +import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalTableScan; + +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED; + +/** + * A FlinkOptimizeProgram that recompute statistics after partition pruning and filter push down. + * + * It's a very heavy operation to get statistics from catalogs or connectors, so this centralized + * way can avoid getting statistics again and again. + */ +public class FlinkRecomputeStatisticsProgram implements FlinkOptimizeProgram { Review Comment: I think we should now use Junit5 and assertj in newly introduced tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhoulii commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits
zhoulii commented on code in PR #19946: URL: https://github.com/apache/flink/pull/19946#discussion_r896348640 ## flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java: ## @@ -160,6 +160,44 @@ public void testGetStatisticsMultiplePaths() throws IOException { stats.getTotalInputSize()); } +@Test +public void testCreateInputSplitsWithEmptySplit() throws IOException { +// create temporary file with 3 blocks +final File tempFile = File.createTempFile("binary_input_format_test", "tmp"); +tempFile.deleteOnExit(); Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28034) ClassCastException occurred in creating a checkpoint with merge windows
[ https://issues.apache.org/jira/browse/FLINK-28034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553883#comment-17553883 ] Takayuki Eimizu commented on FLINK-28034: - I have already implemented a fix for this issue and will create a pull request shortly. > ClassCastException occurred in creating a checkpoint with merge windows > > > Key: FLINK-28034 > URL: https://issues.apache.org/jira/browse/FLINK-28034 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.15.0 >Reporter: Takayuki Eimizu >Priority: Major > > h1. Summary > In Flink 1.15.0, the combination of following functions always occur > ClassCastException. > - Session Window > - Checkpoint > - Keyed State > The following repository provides minimal source code that can combine these > features to reproduce the exception. > [https://github.com/t-eimizu/flink-checkpoint-with-merging-window] > > h1. Description > h2. How the Exception Occurred > > In the process window function of the session window, we must use > `context.globalState()` > instead of `context.windowState()`. If you use `context.windowState()` in > this situation, Flink throws `UnsupportedOperationException`. > > So we have to do following: > > {code:java} >stPreviousValue = context.globalState().getState(desc4PreviousValue); > {code} > > Then stPreviousValue will have the following fields: > ||Field Name||Value|| > |currentNamespace|VoidNamespace| > |namespaceSerializer|TimeWindow$serializer| > As a result, when flink create checkpoint on this job, ClassCastException > occurs. > {code:java} > 2022-06-14 11:04:57,212 INFO > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - > ProcessingData -> Sink: PrintData (1/1)#0 - asynchronous part of checkpoint 1 > could not be completed. java.util.concurrent.ExecutionException: > java.lang.ClassCastException: class > org.apache.flink.runtime.state.VoidNamespace cannot be cast to class > org.apache.flink.streaming.api.windowing.windows.TimeWindow > (org.apache.flink.runtime.state.VoidNamespace and > org.apache.flink.streaming.api.windowing.windows.TimeWindow are in unnamed > module of loader 'app') at > java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?] at > java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?] at > org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645) > ~[flink-core-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) > [flink-streaming-java-1.15.0.jar:1.15.0] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > [?:?] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > [?:?] at java.lang.Thread.run(Thread.java:834) [?:?] Caused by: > java.lang.ClassCastException: class > org.apache.flink.runtime.state.VoidNamespace cannot be cast to class > org.apache.flink.streaming.api.windowing.windows.TimeWindow > (org.apache.flink.runtime.state.VoidNamespace and > org.apache.flink.streaming.api.windowing.windows.TimeWindow are in unnamed > module of loader 'app') at > org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer.serialize(TimeWindow.java:130) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:145) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runti
[GitHub] [flink] PatrickRen commented on pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case
PatrickRen commented on PR #19456: URL: https://github.com/apache/flink/pull/19456#issuecomment-1154675519 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28036) HiveInspectors should use correct writable type to creaet ConstantObjectInspector
[ https://issues.apache.org/jira/browse/FLINK-28036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-28036: - Description: In HiveInspectors, we will create Hive's ConstantObjectInspector by passing writable type such as ByteWritable, DoubleWritable, all of the writable class are classes of package org.apache.hadoop.io. But the Hive's ConstantObjectInspector may require different class such as WritableConstantDoubleObjectInspector require class org.apache.hadoop.hive.serde2.io.DoubleWritable, which is class of package org.apache.hadoop.hive.serde2.io.DoubleWritable. Then when touch such code, it will throw "no such method exception: org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectinspector.(org.apache.hadoop.io.DoubleWritable). I found ByteWritable, ShortWritable, DoubleWritable should be the class in package of `class org.apache.hadoop.hive.serde2.io` instead of `org.apache.hadoop.io`. was:In > HiveInspectors should use correct writable type to creaet > ConstantObjectInspector > - > > Key: FLINK-28036 > URL: https://issues.apache.org/jira/browse/FLINK-28036 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Priority: Major > Fix For: 1.16.0 > > > In HiveInspectors, we will create Hive's ConstantObjectInspector by passing > writable type such as ByteWritable, DoubleWritable, all of the writable class > are classes of package org.apache.hadoop.io. But the Hive's > ConstantObjectInspector may require different class such as > WritableConstantDoubleObjectInspector require class > org.apache.hadoop.hive.serde2.io.DoubleWritable, which is class of package > org.apache.hadoop.hive.serde2.io.DoubleWritable. Then when touch such code, > it will throw "no such method exception: > org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectinspector.(org.apache.hadoop.io.DoubleWritable). > > I found ByteWritable, ShortWritable, DoubleWritable should be the class in > package of `class org.apache.hadoop.hive.serde2.io` instead of > `org.apache.hadoop.io`. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] dianfu commented on a diff in pull request #19295: [FLINK-26941][cep] Support Pattern end with notFollowedBy with window
dianfu commented on code in PR #19295: URL: https://github.com/apache/flink/pull/19295#discussion_r895598233 ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java: ## @@ -261,15 +261,31 @@ public Collection>, Long>> advanceTime( final NFAState nfaState, final long timestamp) throws Exception { +return advanceTimeAndHandlePendingState(sharedBufferAccessor, nfaState, timestamp).f1; +} + +public Tuple2>>, Collection>, Long>>> +advanceTimeAndHandlePendingState( Review Comment: There is no need to introduce advanceTimeAndHandlePendingState. We could just update the signature of advanceTime if needed. ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ## @@ -304,6 +305,18 @@ private State createMiddleStates(final State sinkState) { if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { // skip notFollow patterns, they are converted into edge conditions +if (currentPattern.getWindowTime() != null Review Comment: should use `windowTime` instead of `currentPattern.getWindowTime()`. This also means that we need to calculate `windowTime` in advance. ## flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java: ## @@ -630,6 +630,102 @@ public String select(Map> pattern) { assertEquals(expected, resultList); } +@Test +public void testNotFollowedByWithIn() throws Exception { Review Comment: Could we support the following pattern? If so, it would be great to add an ITCase. ``` Pattern.begin('A').notFollowedBy('B').followedBy('C').times(0, 2).withIn(Time.milliseconds(3)) ``` ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ## @@ -158,9 +158,10 @@ public static boolean canProduceEmptyMatches(final Pattern pattern) { */ void compileFactory() { if (currentPattern.getQuantifier().getConsumingStrategy() -== Quantifier.ConsumingStrategy.NOT_FOLLOW) { +== Quantifier.ConsumingStrategy.NOT_FOLLOW Review Comment: Currently, the windowTime is the minimum of all window times and so it may happen that the window time is defined in the other Pattern. What about moving this check to the end of this method? ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ## @@ -304,6 +305,18 @@ private State createMiddleStates(final State sinkState) { if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { // skip notFollow patterns, they are converted into edge conditions +if (currentPattern.getWindowTime() != null +&& currentPattern.getWindowTime().toMilliseconds() > 0 +&& sinkState.isFinal()) { +final State notFollow = +createState(currentPattern.getName(), State.StateType.Pending); +final IterativeCondition notCondition = getTakeCondition(currentPattern); +final State stopState = +createStopState(notCondition, currentPattern.getName()); +notFollow.addTake(stopState, notCondition); Review Comment: ```suggestion notFollow.addProceed(stopState, notCondition); ``` ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ## @@ -304,6 +305,18 @@ private State createMiddleStates(final State sinkState) { if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { // skip notFollow patterns, they are converted into edge conditions +if (currentPattern.getWindowTime() != null +&& currentPattern.getWindowTime().toMilliseconds() > 0 +&& sinkState.isFinal()) { Review Comment: ```suggestion && lastSink.isFinal()) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28036) HiveInspectors should use correct writable type to creaet ConstantObjectInspector
[ https://issues.apache.org/jira/browse/FLINK-28036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-28036: - Summary: HiveInspectors should use correct writable type to creaet ConstantObjectInspector (was: Use correct writable type to creaet ConstantObjectInspector) > HiveInspectors should use correct writable type to creaet > ConstantObjectInspector > - > > Key: FLINK-28036 > URL: https://issues.apache.org/jira/browse/FLINK-28036 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Priority: Major > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28036) Use correct writable type to creaet ConstantObjectInspector
luoyuxia created FLINK-28036: Summary: Use correct writable type to creaet ConstantObjectInspector Key: FLINK-28036 URL: https://issues.apache.org/jira/browse/FLINK-28036 Project: Flink Issue Type: Sub-task Reporter: luoyuxia -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28036) Use correct writable type to creaet ConstantObjectInspector
[ https://issues.apache.org/jira/browse/FLINK-28036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-28036: - Fix Version/s: 1.16.0 > Use correct writable type to creaet ConstantObjectInspector > --- > > Key: FLINK-28036 > URL: https://issues.apache.org/jira/browse/FLINK-28036 > Project: Flink > Issue Type: Sub-task >Reporter: luoyuxia >Priority: Major > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28036) Use correct writable type to creaet ConstantObjectInspector
[ https://issues.apache.org/jira/browse/FLINK-28036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-28036: - Component/s: Connectors / Hive > Use correct writable type to creaet ConstantObjectInspector > --- > > Key: FLINK-28036 > URL: https://issues.apache.org/jira/browse/FLINK-28036 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Priority: Major > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28036) HiveInspectors should use correct writable type to creaet ConstantObjectInspector
[ https://issues.apache.org/jira/browse/FLINK-28036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-28036: - Description: In > HiveInspectors should use correct writable type to creaet > ConstantObjectInspector > - > > Key: FLINK-28036 > URL: https://issues.apache.org/jira/browse/FLINK-28036 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: luoyuxia >Priority: Major > Fix For: 1.16.0 > > > In -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-27395) IllegalStateException: Could not find policy 'pick_first'. on Flink Application
[ https://issues.apache.org/jira/browse/FLINK-27395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Natan HP updated FLINK-27395: - Description: I got this exception on flink taskmanager, but I can see that the data is successfully published in the pub sub. Here is the log: {noformat} 2022-04-25 07:53:44,293 INFO org.apache.flink.runtime.taskmanager.Task [] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched from INITIALIZING to RUNNING. Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 uncaughtException SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the SynchronizationContext. Panic! java.lang.IllegalStateException: Could not find policy 'pick_first'. Make sure its implementation is either registered to LoadBalancerRegistry or included in META-INF/services/io.grpc.LoadBalancerProvider from your jar files. at io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.(AutoConfiguredLoadBalancerFactory.java:94) at io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65) at io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375) at io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469) at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) at io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473) at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253) at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210) at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32) at com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94) at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314) at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288) at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200) at com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58) at com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65) at com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64) at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86) at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63) at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41) at com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82) at com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79) at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126) at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87) at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425) at com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471) at com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399) at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88) at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {noformat} The code sample: {code:java} SinkFunction pubsubSink = PubSubSink.newBuilder() .withSerializationSchema((SerializationSchema) s -> s.getBytes(StandardCharsets.UTF_8)) .withProjectName("") .withTopicName("") .build(); dataStream.addSink(pubsubSink) .name("Pub-sub-sink"); {code} I use Maven Assembly Plugin to create the uber JAR: {noformat} org.apache.maven.plugins maven-assembly-plugin 2.6 org.example.flink.Main jar-with-dependencies make-assembly package single {noformat} The content of the JAR: {noformat} ➜ jar tf MyApp.jar | grep io.grpc.LoadBalancerProvider io/grpc/LoadBalancerProvider$UnknownConfig.class META-INF/services/io.grpc.Lo
[jira] [Updated] (FLINK-27395) IllegalStateException: Could not find policy 'pick_first'. on Flink Application
[ https://issues.apache.org/jira/browse/FLINK-27395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Natan HP updated FLINK-27395: - Description: I got this exception on flink taskmanager, but I can see that the data is successfully published in the pub sub. Here is the log: {noformat} 2022-04-25 07:53:44,293 INFO org.apache.flink.runtime.taskmanager.Task [] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched from INITIALIZING to RUNNING. Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 uncaughtException SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the SynchronizationContext. Panic! java.lang.IllegalStateException: Could not find policy 'pick_first'. Make sure its implementation is either registered to LoadBalancerRegistry or included in META-INF/services/io.grpc.LoadBalancerProvider from your jar files. at io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.(AutoConfiguredLoadBalancerFactory.java:94) at io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65) at io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375) at io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469) at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) at io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473) at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253) at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210) at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32) at com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94) at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314) at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288) at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200) at com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58) at com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65) at com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64) at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86) at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63) at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41) at com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82) at com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79) at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126) at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87) at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425) at com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471) at com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399) at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88) at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {noformat} The code sample: {code:java} SinkFunction pubsubSink = PubSubSink.newBuilder() .withSerializationSchema((SerializationSchema) s -> s.getBytes(StandardCharsets.UTF_8)) .withProjectName("") .withTopicName("") .build(); dataStream.addSink(pubsubSink) .name("Pub-sub-sink"); {code} I use Maven Assembly Plugin to create the uber JAR: {noformat} org.apache.maven.plugins maven-assembly-plugin 2.6 org.example.flink.Main jar-with-dependencies make-assembly package single {noformat} The content of the JAR: {noformat} ➜ jar tf MyApp.jar | grep io.grpc.LoadBalancerProvider io/grpc/LoadBalancerProvider$UnknownConfig.class META-INF/services/io.grpc.Lo
[jira] [Closed] (FLINK-27395) IllegalStateException: Could not find policy 'pick_first'. on Flink Application
[ https://issues.apache.org/jira/browse/FLINK-27395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Natan HP closed FLINK-27395. > IllegalStateException: Could not find policy 'pick_first'. on Flink > Application > --- > > Key: FLINK-27395 > URL: https://issues.apache.org/jira/browse/FLINK-27395 > Project: Flink > Issue Type: Bug > Components: Connectors / Google Cloud PubSub >Affects Versions: 1.15.0, 1.14.2, 1.14.4 > Environment: # Minikube > {noformat} > ➜ minikube version > minikube version: v1.25.2 > commit: 362d5fdc0a3dbee389b3d3f1034e8023e72bd3a7 > {noformat} > # Apache Flink Docker Images > {noformat} > apache/flink:1.14.4-scala_2.11{noformat} > {noformat} > apache/flink:1.15.0-scala_2.12-java11{noformat} >Reporter: Natan HP >Priority: Major > > I got this exception on flink taskmanager, but I can see that the data is > successfully published in the pub sub. Here is the log: > > {noformat} > 2022-04-25 07:53:44,293 INFO org.apache.flink.runtime.taskmanager.Task > > [] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched > from INITIALIZING to RUNNING. > Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 > uncaughtException > SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the > SynchronizationContext. Panic! > java.lang.IllegalStateException: Could not find policy 'pick_first'. Make > sure its implementation is either registered to LoadBalancerRegistry or > included in META-INF/services/io.grpc.LoadBalancerProvider from your jar > files. > at > io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.(AutoConfiguredLoadBalancerFactory.java:94) > at > io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65) > at > io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375) > at > io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469) > at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) > at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) > at > io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473) > at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253) > at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210) > at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32) > at > com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94) > at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314) > at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288) > at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200) > at > com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58) > at > com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65) > at > com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64) > at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86) > at > com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63) > at > com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41) > at > com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82) > at > com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79) > at > com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126) > at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87) > at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425) > at > com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471) > at > com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399) > at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88) > at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[jira] [Resolved] (FLINK-27395) IllegalStateException: Could not find policy 'pick_first'. on Flink Application
[ https://issues.apache.org/jira/browse/FLINK-27395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Natan HP resolved FLINK-27395. -- Resolution: Resolved > IllegalStateException: Could not find policy 'pick_first'. on Flink > Application > --- > > Key: FLINK-27395 > URL: https://issues.apache.org/jira/browse/FLINK-27395 > Project: Flink > Issue Type: Bug > Components: Connectors / Google Cloud PubSub >Affects Versions: 1.15.0, 1.14.2, 1.14.4 > Environment: # Minikube > {noformat} > ➜ minikube version > minikube version: v1.25.2 > commit: 362d5fdc0a3dbee389b3d3f1034e8023e72bd3a7 > {noformat} > # Apache Flink Docker Images > {noformat} > apache/flink:1.14.4-scala_2.11{noformat} > {noformat} > apache/flink:1.15.0-scala_2.12-java11{noformat} >Reporter: Natan HP >Priority: Major > > I got this exception on flink taskmanager, but I can see that the data is > successfully published in the pub sub. Here is the log: > > {noformat} > 2022-04-25 07:53:44,293 INFO org.apache.flink.runtime.taskmanager.Task > > [] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched > from INITIALIZING to RUNNING. > Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 > uncaughtException > SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the > SynchronizationContext. Panic! > java.lang.IllegalStateException: Could not find policy 'pick_first'. Make > sure its implementation is either registered to LoadBalancerRegistry or > included in META-INF/services/io.grpc.LoadBalancerProvider from your jar > files. > at > io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.(AutoConfiguredLoadBalancerFactory.java:94) > at > io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65) > at > io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375) > at > io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469) > at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) > at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) > at > io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473) > at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253) > at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210) > at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32) > at > com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94) > at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314) > at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288) > at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200) > at > com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58) > at > com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65) > at > com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64) > at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86) > at > com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63) > at > com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41) > at > com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82) > at > com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79) > at > com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126) > at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87) > at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425) > at > com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471) > at > com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399) > at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88) > at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(
[GitHub] [flink] zhuzhurk commented on a diff in pull request #19946: [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits
zhuzhurk commented on code in PR #19946: URL: https://github.com/apache/flink/pull/19946#discussion_r896335558 ## flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java: ## @@ -160,6 +160,44 @@ public void testGetStatisticsMultiplePaths() throws IOException { stats.getTotalInputSize()); } +@Test +public void testCreateInputSplitsWithEmptySplit() throws IOException { +// create temporary file with 3 blocks +final File tempFile = File.createTempFile("binary_input_format_test", "tmp"); +tempFile.deleteOnExit(); Review Comment: The `tempFile` can be deleted only if the whole JVM exits. I would suggest to use `TemporaryFolder` which can be cleaned after the test method finishes. `FlinkUserCodeClassLoadersTest` can be an example to reference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27395) IllegalStateException: Could not find policy 'pick_first'. on Flink Application
[ https://issues.apache.org/jira/browse/FLINK-27395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Natan HP updated FLINK-27395: - Description: I got this exception on flink taskmanager, but I can see that the data is successfully published in the pub sub. Here is the log: {noformat} 2022-04-25 07:53:44,293 INFO org.apache.flink.runtime.taskmanager.Task [] - Sink: Pub-sub-sink (1/2)#0 (cffc263c60c6c0c8c56092a8386601eb) switched from INITIALIZING to RUNNING. Apr 25, 2022 7:53:45 AM io.grpc.internal.ManagedChannelImpl$1 uncaughtException SEVERE: [Channel<1>: (pubsub.googleapis.com:443)] Uncaught exception in the SynchronizationContext. Panic! java.lang.IllegalStateException: Could not find policy 'pick_first'. Make sure its implementation is either registered to LoadBalancerRegistry or included in META-INF/services/io.grpc.LoadBalancerProvider from your jar files. at io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.(AutoConfiguredLoadBalancerFactory.java:94) at io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:65) at io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:375) at io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider$1ExitIdleModeForTransport.run(ManagedChannelImpl.java:469) at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) at io.grpc.internal.ManagedChannelImpl$ChannelTransportProvider.get(ManagedChannelImpl.java:473) at io.grpc.internal.ClientCallImpl.startInternal(ClientCallImpl.java:253) at io.grpc.internal.ClientCallImpl.start(ClientCallImpl.java:210) at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:32) at com.google.api.gax.grpc.GrpcHeaderInterceptor$1.start(GrpcHeaderInterceptor.java:94) at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:314) at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:288) at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:200) at com.google.api.gax.grpc.GrpcDirectCallable.futureCall(GrpcDirectCallable.java:58) at com.google.api.gax.grpc.GrpcUnaryRequestParamCallable.futureCall(GrpcUnaryRequestParamCallable.java:65) at com.google.api.gax.grpc.GrpcExceptionCallable.futureCall(GrpcExceptionCallable.java:64) at com.google.api.gax.rpc.AttemptCallable.call(AttemptCallable.java:86) at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:63) at com.google.api.gax.rpc.RetryingCallable.futureCall(RetryingCallable.java:41) at com.google.api.gax.tracing.TracedBatchingCallable.futureCall(TracedBatchingCallable.java:82) at com.google.api.gax.rpc.BatchingCallable.futureCall(BatchingCallable.java:79) at com.google.api.gax.rpc.UnaryCallable$1.futureCall(UnaryCallable.java:126) at com.google.api.gax.rpc.UnaryCallable.futureCall(UnaryCallable.java:87) at com.google.cloud.pubsub.v1.Publisher.publishCall(Publisher.java:425) at com.google.cloud.pubsub.v1.Publisher.publishOutstandingBatch(Publisher.java:471) at com.google.cloud.pubsub.v1.Publisher.publishAllWithoutInflight(Publisher.java:399) at com.google.cloud.pubsub.v1.Publisher.access$1100(Publisher.java:88) at com.google.cloud.pubsub.v1.Publisher$2.run(Publisher.java:326) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {noformat} The code sample: {code:java} SinkFunction pubsubSink = PubSubSink.newBuilder() .withSerializationSchema((SerializationSchema) s -> s.getBytes(StandardCharsets.UTF_8)) .withProjectName("") .withTopicName("") .build(); dataStream.addSink(pubsubSink) .name("Pub-sub-sink"); {code} I use Maven Assembly Plugin to create the uber JAR: {noformat} org.apache.maven.plugins maven-assembly-plugin 2.6 org.example.flink.Main jar-with-dependencies make-assembly package single {noformat} The content of the JAR: {noformat} ➜ jar tf MyApp.jar | grep io.grpc.LoadBalancerProvider io/grpc/LoadBalancerProvider$UnknownConfig.class META-INF/services/io.grpc.Lo
[jira] [Comment Edited] (FLINK-28003) A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION'
[ https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553870#comment-17553870 ] Jing Zhang edited comment on FLINK-28003 at 6/14/22 3:14 AM: - As discussed with [~fsk119] offline, we would add a configure to disable sql complete in SqlClient. Besides, for '-f sqlFile', it makes sense to disable sql complete by default. was (Author: qingru zhang): As discussed with [~fsk119] offline, we add a configure to disable sql complete in SqlClient. Besides, for '-f sqlFile', it makes sense to disable sql complete by default. > A unexpected replacement happened in SqlClient, for example, replace 'pos ' > to 'POSITION' > -- > > Key: FLINK-28003 > URL: https://issues.apache.org/jira/browse/FLINK-28003 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.15.0 >Reporter: Jing Zhang >Assignee: Shengkai Fang >Priority: Major > Attachments: zj_test.sql > > > When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql' > {code:java} > create table if not exists db.zj_test( > pos int, > rank_cmd string > ) > partitioned by ( > `p_date` string, > `p_hourmin` string); > INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = > '0100') > SELECT > pos , > rank_cmd > FROM db.sourceT > where p_date = '20220605' and p_hourmin = '0100'; {code} > An error would be thrown out because the 'pos' field is changed to > 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something > wrong. > The error could be reproduced using the attached file. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28003) A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION'
[ https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553870#comment-17553870 ] Jing Zhang commented on FLINK-28003: As discussed with [~fsk119] offline, we add a configure to disable sql complete in SqlClient. Besides, for '-f sqlFile', it makes sense to disable sql complete by default. > A unexpected replacement happened in SqlClient, for example, replace 'pos ' > to 'POSITION' > -- > > Key: FLINK-28003 > URL: https://issues.apache.org/jira/browse/FLINK-28003 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.15.0 >Reporter: Jing Zhang >Assignee: Shengkai Fang >Priority: Major > Attachments: zj_test.sql > > > When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql' > {code:java} > create table if not exists db.zj_test( > pos int, > rank_cmd string > ) > partitioned by ( > `p_date` string, > `p_hourmin` string); > INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = > '0100') > SELECT > pos , > rank_cmd > FROM db.sourceT > where p_date = '20220605' and p_hourmin = '0100'; {code} > An error would be thrown out because the 'pos' field is changed to > 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something > wrong. > The error could be reproduced using the attached file. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (FLINK-28003) A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION'
[ https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Zhang reassigned FLINK-28003: -- Assignee: Shengkai Fang > A unexpected replacement happened in SqlClient, for example, replace 'pos ' > to 'POSITION' > -- > > Key: FLINK-28003 > URL: https://issues.apache.org/jira/browse/FLINK-28003 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.15.0 >Reporter: Jing Zhang >Assignee: Shengkai Fang >Priority: Major > Attachments: zj_test.sql > > > When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql' > {code:java} > create table if not exists db.zj_test( > pos int, > rank_cmd string > ) > partitioned by ( > `p_date` string, > `p_hourmin` string); > INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = > '0100') > SELECT > pos , > rank_cmd > FROM db.sourceT > where p_date = '20220605' and p_hourmin = '0100'; {code} > An error would be thrown out because the 'pos' field is changed to > 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something > wrong. > The error could be reproduced using the attached file. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] lsyldliu commented on pull request #19860: [FLINK-27658][table] FlinkUserCodeClassLoader expose addURL method to allow to register jar dynamically
lsyldliu commented on PR #19860: URL: https://github.com/apache/flink/pull/19860#issuecomment-1154661119 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-27944) IO metrics collision happens if a task has union inputs
[ https://issues.apache.org/jira/browse/FLINK-27944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-27944: Fix Version/s: 1.15.2 (was: 1.15.1) > IO metrics collision happens if a task has union inputs > --- > > Key: FLINK-27944 > URL: https://issues.apache.org/jira/browse/FLINK-27944 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.15.0 >Reporter: Zhu Zhu >Priority: Critical > Fix For: 1.16.0, 1.15.2 > > > When a task has union inputs, some IO metrics(numBytesIn* and numBuffersIn*) > of the different inputs may collide and failed to be registered. > > The problem can be reproduced with a simple job like: > {code:java} > DataStream source1 = env.fromElements("abc"); > DataStream source2 = env.fromElements("123"); > source1.union(source2).print();{code} > > Logs of collisions: > {code:java} > 2022-06-08 00:59:01,629 WARN org.apache.flink.metrics.MetricGroup > [] - Name collision: Group already contains a Metric with the > name 'numBytesInLocal'. Metric will not be reported.[, taskmanager, > fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, > Shuffle, Netty, Input] > 2022-06-08 00:59:01,629 WARN org.apache.flink.metrics.MetricGroup > [] - Name collision: Group already contains a Metric with the > name 'numBytesInLocalPerSecond'. Metric will not be reported.[, taskmanager, > fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, > Shuffle, Netty, Input] > 2022-06-08 00:59:01,629 WARN org.apache.flink.metrics.MetricGroup > [] - Name collision: Group already contains a Metric with the > name 'numBytesInLocal'. Metric will not be reported.[, taskmanager, > fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0] > 2022-06-08 00:59:01,629 WARN org.apache.flink.metrics.MetricGroup > [] - Name collision: Group already contains a Metric with the > name 'numBytesInLocalPerSecond'. Metric will not be reported.[, taskmanager, > fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0] > 2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup > [] - Name collision: Group already contains a Metric with the > name 'numBytesInRemote'. Metric will not be reported.[, taskmanager, > fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, > Shuffle, Netty, Input] > 2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup > [] - Name collision: Group already contains a Metric with the > name 'numBytesInRemotePerSecond'. Metric will not be reported.[, taskmanager, > fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, > Shuffle, Netty, Input] > 2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup > [] - Name collision: Group already contains a Metric with the > name 'numBytesInRemote'. Metric will not be reported.[, taskmanager, > fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0] > 2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup > [] - Name collision: Group already contains a Metric with the > name 'numBytesInRemotePerSecond'. Metric will not be reported.[, taskmanager, > fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0] > 2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup > [] - Name collision: Group already contains a Metric with the > name 'numBuffersInLocal'. Metric will not be reported.[, taskmanager, > fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0, > Shuffle, Netty, Input] > 2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup > [] - Name collision: Group already contains a Metric with the > name 'numBuffersInLocalPerSecond'. Metric will not be reported.[, > taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to > Std. Out, 0, Shuffle, Netty, Input] > 2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup > [] - Name collision: Group already contains a Metric with the > name 'numBuffersInLocal'. Metric will not be reported.[, taskmanager, > fa9f270e-e904-4f69-8227-8d6e26e1be62, WordCount, Sink: Print to Std. Out, 0] > 2022-06-08 00:59:01,630 WARN org.apache.flink.metrics.MetricGroup > [] - Name collision: Group already contains a Metric with the > name 'numBuffersInLocalPerSecond'. Metric will not be reported.[, > taskmanager, fa9f270e-e904-4f69-8227-8d6e26e1be62, W
[GitHub] [flink] PatrickRen commented on pull request #19828: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment
PatrickRen commented on PR #19828: URL: https://github.com/apache/flink/pull/19828#issuecomment-1154653485 @becketqin @leonardBang Please take a look when you are available. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28035) Don't check num of buckets for rescale bucket condition
[ https://issues.apache.org/jira/browse/FLINK-28035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan updated FLINK-28035: -- Summary: Don't check num of buckets for rescale bucket condition (was: Enable scan reading different bucket num for rescale bucket) > Don't check num of buckets for rescale bucket condition > --- > > Key: FLINK-28035 > URL: https://issues.apache.org/jira/browse/FLINK-28035 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.2.0 >Reporter: Jane Chan >Priority: Major > Fix For: table-store-0.2.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] PatrickRen commented on pull request #19828: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment
PatrickRen commented on PR #19828: URL: https://github.com/apache/flink/pull/19828#issuecomment-1154652936 Thanks for the review @zou-can ! Initially I was trying to fix this case from the root that `KafkaPartitionSplitReader#wakeup` should only wake up the blocking `KafkaConsumer#poll` invocation, but I realized that it's not possible to do so because Kafka consumer doesn't expose such an API. I have updated my code to add a wrapper on Kafka consumer calls that catch `WakeupException` and retry on exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-28003) A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION'
[ https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553860#comment-17553860 ] Jing Zhang edited comment on FLINK-28003 at 6/14/22 2:51 AM: - [~martijnvisser] Thanks for response, however you might misunderstand here. I don't use reserved keyword, I use 'pos', but SqlClient replaces it to 'position' which leads to an error. This replacement causes a lot of problems. was (Author: qingru zhang): [~martijnvisser] Thanks for response, however you might misunderstand here. I don't use reserved keyword, I use pos, but SqlClient replaces it to position which leads to an error. This replacement causes a lot of problems. > A unexpected replacement happened in SqlClient, for example, replace 'pos ' > to 'POSITION' > -- > > Key: FLINK-28003 > URL: https://issues.apache.org/jira/browse/FLINK-28003 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.15.0 >Reporter: Jing Zhang >Priority: Major > Attachments: zj_test.sql > > > When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql' > {code:java} > create table if not exists db.zj_test( > pos int, > rank_cmd string > ) > partitioned by ( > `p_date` string, > `p_hourmin` string); > INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = > '0100') > SELECT > pos , > rank_cmd > FROM db.sourceT > where p_date = '20220605' and p_hourmin = '0100'; {code} > An error would be thrown out because the 'pos' field is changed to > 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something > wrong. > The error could be reproduced using the attached file. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28035) Enable scan reading different bucket num for rescale bucket
Jane Chan created FLINK-28035: - Summary: Enable scan reading different bucket num for rescale bucket Key: FLINK-28035 URL: https://issues.apache.org/jira/browse/FLINK-28035 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.2.0 Reporter: Jane Chan Fix For: table-store-0.2.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28003) A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION'
[ https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553860#comment-17553860 ] Jing Zhang commented on FLINK-28003: [~martijnvisser] Thanks for response, however you might misunderstand here. I don't use reserved keyword, I use pos, but SqlClient replaces it to position which leads to an error. This replacement causes a lot of problems. > A unexpected replacement happened in SqlClient, for example, replace 'pos ' > to 'POSITION' > -- > > Key: FLINK-28003 > URL: https://issues.apache.org/jira/browse/FLINK-28003 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.15.0 >Reporter: Jing Zhang >Priority: Major > Attachments: zj_test.sql > > > When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql' > {code:java} > create table if not exists db.zj_test( > pos int, > rank_cmd string > ) > partitioned by ( > `p_date` string, > `p_hourmin` string); > INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = > '0100') > SELECT > pos , > rank_cmd > FROM db.sourceT > where p_date = '20220605' and p_hourmin = '0100'; {code} > An error would be thrown out because the 'pos' field is changed to > 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something > wrong. > The error could be reproduced using the attached file. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28003) A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION'
[ https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Zhang updated FLINK-28003: --- Summary: A unexpected replacement happened in SqlClient, for example, replace 'pos ' to 'POSITION' (was: 'pos ' field would be updated to 'POSITION' when use SqlClient) > A unexpected replacement happened in SqlClient, for example, replace 'pos ' > to 'POSITION' > -- > > Key: FLINK-28003 > URL: https://issues.apache.org/jira/browse/FLINK-28003 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.15.0 >Reporter: Jing Zhang >Priority: Major > Attachments: zj_test.sql > > > When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql' > {code:java} > create table if not exists db.zj_test( > pos int, > rank_cmd string > ) > partitioned by ( > `p_date` string, > `p_hourmin` string); > INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = > '0100') > SELECT > pos , > rank_cmd > FROM db.sourceT > where p_date = '20220605' and p_hourmin = '0100'; {code} > An error would be thrown out because the 'pos' field is changed to > 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something > wrong. > The error could be reproduced using the attached file. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Reopened] (FLINK-28003) 'pos ' field would be updated to 'POSITION' when use SqlClient
[ https://issues.apache.org/jira/browse/FLINK-28003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Zhang reopened FLINK-28003: > 'pos ' field would be updated to 'POSITION' when use SqlClient > --- > > Key: FLINK-28003 > URL: https://issues.apache.org/jira/browse/FLINK-28003 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.15.0 >Reporter: Jing Zhang >Priority: Major > Attachments: zj_test.sql > > > When I run the following sql in SqlClient using 'sql-client.sh -f zj_test.sql' > {code:java} > create table if not exists db.zj_test( > pos int, > rank_cmd string > ) > partitioned by ( > `p_date` string, > `p_hourmin` string); > INSERT OVERWRITE TABLE db.zj_test PARTITION (p_date='20220605', p_hourmin = > '0100') > SELECT > pos , > rank_cmd > FROM db.sourceT > where p_date = '20220605' and p_hourmin = '0100'; {code} > An error would be thrown out because the 'pos' field is changed to > 'POSITION'. I guess `SqlCompleter` in sqlClient module might do something > wrong. > The error could be reproduced using the attached file. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27951) Translate the "Debugging Classloading" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-27951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553859#comment-17553859 ] Zili Sun commented on FLINK-27951: -- [~martijnvisser] I have done the work and have created a pr, can you help to review plz, thanks! > Translate the "Debugging Classloading" page into Chinese > > > Key: FLINK-27951 > URL: https://issues.apache.org/jira/browse/FLINK-27951 > Project: Flink > Issue Type: Improvement > Components: chinese-translation >Reporter: Zili Sun >Assignee: Zili Sun >Priority: Minor > Labels: pull-request-available > > The page "Debugging Classloading" needs to be translated into Chinese. > I'm willing to work on this issue. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28034) ClassCastException occurred in creating a checkpoint with merge windows
Takayuki Eimizu created FLINK-28034: --- Summary: ClassCastException occurred in creating a checkpoint with merge windows Key: FLINK-28034 URL: https://issues.apache.org/jira/browse/FLINK-28034 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Affects Versions: 1.15.0 Reporter: Takayuki Eimizu h1. Summary In Flink 1.15.0, the combination of following functions always occur ClassCastException. - Session Window - Checkpoint - Keyed State The following repository provides minimal source code that can combine these features to reproduce the exception. [https://github.com/t-eimizu/flink-checkpoint-with-merging-window] h1. Description h2. How the Exception Occurred In the process window function of the session window, we must use `context.globalState()` instead of `context.windowState()`. If you use `context.windowState()` in this situation, Flink throws `UnsupportedOperationException`. So we have to do following: {code:java} stPreviousValue = context.globalState().getState(desc4PreviousValue); {code} Then stPreviousValue will have the following fields: ||Field Name||Value|| |currentNamespace|VoidNamespace| |namespaceSerializer|TimeWindow$serializer| As a result, when flink create checkpoint on this job, ClassCastException occurs. {code:java} 2022-06-14 11:04:57,212 INFO org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - ProcessingData -> Sink: PrintData (1/1)#0 - asynchronous part of checkpoint 1 could not be completed. java.util.concurrent.ExecutionException: java.lang.ClassCastException: class org.apache.flink.runtime.state.VoidNamespace cannot be cast to class org.apache.flink.streaming.api.windowing.windows.TimeWindow (org.apache.flink.runtime.state.VoidNamespace and org.apache.flink.streaming.api.windowing.windows.TimeWindow are in unnamed module of loader 'app') at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?] at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645) ~[flink-core-1.15.0.jar:1.15.0] at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) [flink-streaming-java-1.15.0.jar:1.15.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:834) [?:?] Caused by: java.lang.ClassCastException: class org.apache.flink.runtime.state.VoidNamespace cannot be cast to class org.apache.flink.streaming.api.windowing.windows.TimeWindow (org.apache.flink.runtime.state.VoidNamespace and org.apache.flink.streaming.api.windowing.windows.TimeWindow are in unnamed module of loader 'app') at org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer.serialize(TimeWindow.java:130) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:145) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) ~[flink-runtime-1.15.0.jar:1.15.0] at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:642) ~[flink-core-1.15.0.jar:1.15.0] ... 6 more {code} h2. workaround Turn off the checkpoint function. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-28029) Checkpoint always hangs when running some jobs
[ https://issues.apache.org/jira/browse/FLINK-28029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-28029. Resolution: Duplicate > Checkpoint always hangs when running some jobs > -- > > Key: FLINK-28029 > URL: https://issues.apache.org/jira/browse/FLINK-28029 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.3 >Reporter: Pauli Gandhi >Priority: Major > > We have noticed that Flink jobs hangs and eventually times out after 2 hours > every time at the first checkpoint after it completes 15/23 acknowledgments > (65%). There is no cpu activity but yet there are number of tasks reporting > 100% back pressure. It is peculiar to this job and slight modifications to > this job. We have created many Flink jobs in the past and never encountered > the issue. > Here are the things we tried to narrow down the problem > * The job runs fine if checkpointing is disabled. > * Increasing the number of task managers and parallelism to 2 seems to help > the job complete. However, it stalled again when we sent a larger data set. > * Increased taskmanager memory from 4 GB to 16 GB and cpu from 1 to 4 but > didn't help. > * Sometimes restarting the job manager helps but at other times not. > * Breaking up the job into smaller parts helps the job to finish. > * Analyzed the the thread dump and it appears all threads are either in > sleeping or wait state. > Here are the environment details > * Flink version 1.14.3 > * Running Kubernetes > * Using RocksDB state backend. > * Checkpoint storage is S3 storage using the Presto library > * Exactly Once Semantics with unaligned checkpoints enabled. > * Checkpoint timeout 2 hours > * Maximum concurrent checkpoints is 1 > * Taskmanager CPU: 4, Slots: 1, Process Size: 12 GB > * Using Kafka for input and output > I have attached the task manager logs, thread dump, and screen shots of the > job graph and stalled checkpoint. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-28030) Checkpoint always hangs when running some jobs
[ https://issues.apache.org/jira/browse/FLINK-28030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-28030. Resolution: Duplicate > Checkpoint always hangs when running some jobs > -- > > Key: FLINK-28030 > URL: https://issues.apache.org/jira/browse/FLINK-28030 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.3 >Reporter: Pauli Gandhi >Priority: Major > > We have noticed that Flink jobs hangs and eventually times out after 2 hours > every time at the first checkpoint after it completes 15/23 acknowledgments > (65%). There is no cpu activity but yet there are number of tasks reporting > 100% back pressure. It is peculiar to this job and slight modifications to > this job. We have created many Flink jobs in the past and never encountered > the issue. > Here are the things we tried to narrow down the problem > * The job runs fine if checkpointing is disabled. > * Increasing the number of task managers and parallelism to 2 seems to help > the job complete. However, it stalled again when we sent a larger data set. > * Increased taskmanager memory from 4 GB to 16 GB and cpu from 1 to 4 but > didn't help. > * Sometimes restarting the job manager helps but at other times not. > * Breaking up the job into smaller parts helps the job to finish. > * Analyzed the the thread dump and it appears all threads are either in > sleeping or wait state. > Here are the environment details > * Flink version 1.14.3 > * Running Kubernetes > * Using RocksDB state backend. > * Checkpoint storage is S3 storage using the Presto library > * Exactly Once Semantics with unaligned checkpoints enabled. > * Checkpoint timeout 2 hours > * Maximum concurrent checkpoints is 1 > * Taskmanager CPU: 4, Slots: 1, Process Size: 12 GB > * Using Kafka for input and output > I have attached the task manager logs, thread dump, and screen shots of the > job graph and stalled checkpoint. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-28031) Checkpoint always hangs when running some jobs
[ https://issues.apache.org/jira/browse/FLINK-28031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-28031. Resolution: Duplicate > Checkpoint always hangs when running some jobs > -- > > Key: FLINK-28031 > URL: https://issues.apache.org/jira/browse/FLINK-28031 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.3 >Reporter: Pauli Gandhi >Priority: Major > > We have noticed that Flink jobs hangs and eventually times out after 2 hours > every time at the first checkpoint after it completes 15/23 acknowledgments > (65%). There is no cpu activity but yet there are number of tasks reporting > 100% back pressure. It is peculiar to this job and slight modifications to > this job. We have created many Flink jobs in the past and never encountered > the issue. > Here are the things we tried to narrow down the problem > * The job runs fine if checkpointing is disabled. > * Increasing the number of task managers and parallelism to 2 seems to help > the job complete. However, it stalled again when we sent a larger data set. > * Increased taskmanager memory from 4 GB to 16 GB and cpu from 1 to 4 but > didn't help. > * Sometimes restarting the job manager helps but at other times not. > * Breaking up the job into smaller parts helps the job to finish. > * Analyzed the the thread dump and it appears all threads are either in > sleeping or wait state. > Here are the environment details > * Flink version 1.14.3 > * Running Kubernetes > * Using RocksDB state backend. > * Checkpoint storage is S3 storage using the Presto library > * Exactly Once Semantics with unaligned checkpoints enabled. > * Checkpoint timeout 2 hours > * Maximum concurrent checkpoints is 1 > * Taskmanager CPU: 4, Slots: 1, Process Size: 12 GB > * Using Kafka for input and output > I have attached the task manager logs, thread dump, and screen shots of the > job graph and stalled checkpoint. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] paul8263 commented on pull request #19772: [FLINK-27579][client] The param client.timeout can not be set by dyna…
paul8263 commented on PR #19772: URL: https://github.com/apache/flink/pull/19772#issuecomment-1154618703 Hi @wangyang0918 , Could you please review this PR? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel
[ https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553848#comment-17553848 ] YeAble commented on FLINK-28033: In my mind,it can do like this: {code:java} long newMinWatermark = Long.MAX_VALUE; boolean hasAlignedChannels = false; // determine new overall watermark by considering only watermark-aligned channels across all // channels for (InputChannelStatus channelStatus : channelStatuses) { if (channelStatus.isWatermarkAligned && channelStatus.watermark != Long.MIN_VALUE) { hasAlignedChannels = true; newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); } } // we acknowledge and output the new overall watermark if it really is aggregated // from some remaining aligned channel, and is also larger than the last output watermark if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { lastOutputWatermark = newMinWatermark; output.emitWatermark(new Watermark(lastOutputWatermark)); } {code} I'm not sure is right, so i report this question in Jira and find any help. > find and output new min watermark mybe wrong when in multichannel > - > > Key: FLINK-28033 > URL: https://issues.apache.org/jira/browse/FLINK-28033 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Reporter: YeAble >Priority: Blocker > > File: StatusWatermarkValue.java > Method: findAndOutputNewMinWatermarkAcrossAlignedChannels > {code:java} > //代码占位符 > long newMinWatermark = Long.MAX_VALUE; > boolean hasAlignedChannels = false; > // determine new overall watermark by considering only watermark-aligned > channels across all > // channels > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > hasAlignedChannels = true; > newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); > } > } > // we acknowledge and output the new overall watermark if it really is > aggregated > // from some remaining aligned channel, and is also larger than the last > output watermark > if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { > lastOutputWatermark = newMinWatermark; > output.emitWatermark(new Watermark(lastOutputWatermark)); > } {code} > channelStatus's initalized watermark is Long.MIN_VALUE. when one > channelStatus's watermark is changed,but other channelStatus's is not > changed, the newMinWatermark is always Long.MIN_VALUE and output not > emitwatermark。 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] lsyldliu commented on pull request #19860: [FLINK-27658][table] FlinkUserCodeClassLoader expose addURL method to allow to register jar dynamically
lsyldliu commented on PR #19860: URL: https://github.com/apache/flink/pull/19860#issuecomment-1154606842 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel
[ https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YeAble updated FLINK-28033: --- Priority: Blocker (was: Major) > find and output new min watermark mybe wrong when in multichannel > - > > Key: FLINK-28033 > URL: https://issues.apache.org/jira/browse/FLINK-28033 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Reporter: YeAble >Priority: Blocker > > File: StatusWatermarkValue.java > Method: findAndOutputNewMinWatermarkAcrossAlignedChannels > {code:java} > //代码占位符 > long newMinWatermark = Long.MAX_VALUE; > boolean hasAlignedChannels = false; > // determine new overall watermark by considering only watermark-aligned > channels across all > // channels > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > hasAlignedChannels = true; > newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); > } > } > // we acknowledge and output the new overall watermark if it really is > aggregated > // from some remaining aligned channel, and is also larger than the last > output watermark > if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { > lastOutputWatermark = newMinWatermark; > output.emitWatermark(new Watermark(lastOutputWatermark)); > } {code} > channelStatus's initalized watermark is Long.MIN_VALUE. when one > channelStatus's watermark is changed,but other channelStatus's is not > changed, the newMinWatermark is always Long.MIN_VALUE and output not > emitwatermark。 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel
YeAble created FLINK-28033: -- Summary: find and output new min watermark mybe wrong when in multichannel Key: FLINK-28033 URL: https://issues.apache.org/jira/browse/FLINK-28033 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.15.0 Reporter: YeAble File: StatusWatermarkValue.java Method: findAndOutputNewMinWatermarkAcrossAlignedChannels {code:java} //代码占位符 long newMinWatermark = Long.MAX_VALUE; boolean hasAlignedChannels = false; // determine new overall watermark by considering only watermark-aligned channels across all // channels for (InputChannelStatus channelStatus : channelStatuses) { if (channelStatus.isWatermarkAligned) { hasAlignedChannels = true; newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); } } // we acknowledge and output the new overall watermark if it really is aggregated // from some remaining aligned channel, and is also larger than the last output watermark if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { lastOutputWatermark = newMinWatermark; output.emitWatermark(new Watermark(lastOutputWatermark)); } {code} channelStatus's initalized watermark is Long.MIN_VALUE. when one channelStatus's watermark is changed,but other channelStatus's is not changed, the newMinWatermark is always Long.MIN_VALUE and output not emitwatermark。 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel
[ https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YeAble updated FLINK-28033: --- Affects Version/s: (was: 1.15.0) > find and output new min watermark mybe wrong when in multichannel > - > > Key: FLINK-28033 > URL: https://issues.apache.org/jira/browse/FLINK-28033 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Reporter: YeAble >Priority: Major > > File: StatusWatermarkValue.java > Method: findAndOutputNewMinWatermarkAcrossAlignedChannels > {code:java} > //代码占位符 > long newMinWatermark = Long.MAX_VALUE; > boolean hasAlignedChannels = false; > // determine new overall watermark by considering only watermark-aligned > channels across all > // channels > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > hasAlignedChannels = true; > newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); > } > } > // we acknowledge and output the new overall watermark if it really is > aggregated > // from some remaining aligned channel, and is also larger than the last > output watermark > if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { > lastOutputWatermark = newMinWatermark; > output.emitWatermark(new Watermark(lastOutputWatermark)); > } {code} > channelStatus's initalized watermark is Long.MIN_VALUE. when one > channelStatus's watermark is changed,but other channelStatus's is not > changed, the newMinWatermark is always Long.MIN_VALUE and output not > emitwatermark。 -- This message was sent by Atlassian Jira (v8.20.7#820007)