[GitHub] [flink] flinkbot commented on pull request #20055: [FLINK-28215][Buildsystem] Update Maven Surefire plugin to 3.0.0-M7
flinkbot commented on PR #20055: URL: https://github.com/apache/flink/pull/20055#issuecomment-1163981586 ## CI report: * b5d912bea15a3b990e6cd3cd5ab8cda1828e221b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28209) KafkaSink with EXACTLY_ONCE produce reduplicate data(flink kafka connector1.14.4)
[ https://issues.apache.org/jira/browse/FLINK-28209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-28209: --- Component/s: Connectors / Kafka > KafkaSink with EXACTLY_ONCE produce reduplicate data(flink kafka > connector1.14.4) > -- > > Key: FLINK-28209 > URL: https://issues.apache.org/jira/browse/FLINK-28209 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4 >Reporter: tanyao >Priority: Major > Attachments: image-2022-06-23-10-49-01-213.png, > image-2022-06-23-10-58-15-141.png > > > I'm trying to read mysql binlog and transport it to kafka; > here is what i'm using : > *Flink: 1.14.4* > *Flink-CDC : 2.2* > *Kafka: CDH6.2(2.1)* > > *Stage-1:* > mysql-cdc-connector was used to consume mysql binlog data . about 40W rows > changed when i executed some sql in mysql, and i can get those 40W rows > without any data lose or reduplicate, just the some number as mysql changed . > So, i don't think cdc is the problem. > > Stage-2: > when i got binlog data, first i deserialized it to type of > Tuple2, which tuple2.f0 has the format "db.table" and i > intend to use it as kafka topic for every different db.table, tuple2.f1 > contains binlog value only. > > *Stage-3:* > then, i used KafkaSink (which was introduced in flink 1.14) to write binlog > to different kafka topic as tuple2.f0 indicated. > Here is the code like : > !image-2022-06-23-10-49-01-213.png! > > As u can see, I just want to use EXACTLY_ONCE semantics,but here is the > problem: > after about 10mins waiting for all binlog consumed, i checked all data in a > single kafka topic (just one topic ), the total number of rows is bigger > than the number of binlog rows from mysql data changed, because too many > reduplicated data sink to kafka. For example > !image-2022-06-23-10-58-15-141.png! > > Stage-4: > however, when i changed EXACTLY_ONCE. to. AT_LEAST_ONCE, everything worked > very well, no more reduplicated data in kafka. > > > So i'm wonderring , is there any bug in KafkaSink when EXACTLY_ONCE is > configured. > > Can anybody help ? Hope for your answer sincerely. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-24666) Add job level "table.exec.state-stale.error-handling" option and apply to related stateful stream operators
[ https://issues.apache.org/jira/browse/FLINK-24666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-24666: Description: In stream processing, records will be deleted when exceed state ttl (if configured), and when the corresponding record's update arrives again, the operator may not be able to handle it properly, we need a unified error handling mechanism to handle this situation, instead of each stateful operator currently handling its own. e.g., currently, there's a lenient option which was not exposed to users on RetractableTopNFunction {quote}// flag to skip records with non-exist error instead to fail, true by default. private final boolean lenient = true {quote} So there's no chance to raise the exception when the record(s) unexpectedly cleared by state ttl. Commonly this happens because a too shorter ttl at Line 190 or inconstancy between the two internal state(dataState and treeMap) at other place. {quote}List inputs = dataState.get(key); if (inputs == null) Unknown macro: \{ // Skip the data if it's state is cleared because of state ttl. if (lenient) Unknown macro} else Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } } {quote} We'd better to expose it to users (default value can be true to keep consistent with previous version) and should be unified to cover all the stateful stream operators than specific to RetractableTopNFunction. The new option '' was: In stream processing, records will be deleted when exceed state ttl (if configured), and when the corresponding record's update arrives again, the operator may not be able to handle it properly, we need a unified error handling mechanism to handle this situation, instead of each stateful operator currently handling its own. e.g., currently, there's a lenient option which was not exposed to users on RetractableTopNFunction {quote}// flag to skip records with non-exist error instead to fail, true by default. private final boolean lenient = true {quote} So there's no chance to raise the exception when the record(s) unexpectedly cleared by state ttl. Commonly this happens because a too shorter ttl at Line 190 or inconstancy between the two internal state(dataState and treeMap) at other place. {quote}List inputs = dataState.get(key); if (inputs == null) Unknown macro: \{ // Skip the data if it's state is cleared because of state ttl. if (lenient) Unknown macro} else Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } } {quote} We'd better to expose it to users (default value can be true to keep consistent with previous version) and should be unified to cover all the stateful stream operators than specific to RetractableTopNFunction. > Add job level "table.exec.state-stale.error-handling" option and apply to > related stateful stream operators > --- > > Key: FLINK-24666 > URL: https://issues.apache.org/jira/browse/FLINK-24666 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.14.0, 1.13.3 >Reporter: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > In stream processing, records will be deleted when exceed state ttl (if > configured), and when the corresponding record's update arrives again, the > operator may not be able to handle it properly, we need a unified error > handling mechanism to handle this situation, > instead of each stateful operator currently handling its own. > e.g., currently, there's a lenient option which was not exposed to users on > RetractableTopNFunction > {quote}// flag to skip records with non-exist error instead to fail, true by > default. > private final boolean lenient = true > {quote} > So there's no chance to raise the exception when the record(s) unexpectedly > cleared by state ttl. Commonly this happens because a too shorter ttl at Line > 190 or inconstancy between the two internal state(dataState and treeMap) at > other place. > {quote}List inputs = dataState.get(key); > if (inputs == null) > Unknown macro: \{ // Skip the data if it's state is cleared because of state > ttl. if (lenient) Unknown macro} > else > Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } > } > {quote} > We'd better to expose it to users (default value can be true to keep > consistent with previous version) and should be unified to cover all the > stateful stream operators than specific to RetractableTopNFunction. > The new option '' > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27745) ClientUtilsTest.uploadAndSetUserArtifacts failed with NoClassDefFoundError
[ https://issues.apache.org/jira/browse/FLINK-27745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557825#comment-17557825 ] Martijn Visser commented on FLINK-27745: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37082=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=8183 > ClientUtilsTest.uploadAndSetUserArtifacts failed with NoClassDefFoundError > -- > > Key: FLINK-27745 > URL: https://issues.apache.org/jira/browse/FLINK-27745 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Assignee: Yang Wang >Priority: Major > Labels: test-stability > > {code:java} > 2022-05-23T10:27:20.0131798Z May 23 10:27:20 [ERROR] Tests run: 2, Failures: > 0, Errors: 1, Skipped: 0, Time elapsed: 0.729 s <<< FAILURE! - in > org.apache.flink.runtime.client.ClientUtilsTest > 2022-05-23T10:27:20.0133550Z May 23 10:27:20 [ERROR] > org.apache.flink.runtime.client.ClientUtilsTest.uploadAndSetUserArtifacts > Time elapsed: 0.639 s <<< ERROR! > 2022-05-23T10:27:20.0134569Z May 23 10:27:20 > org.apache.flink.util.FlinkException: Could not upload job files. > 2022-05-23T10:27:20.0135587Z May 23 10:27:20 at > org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86) > 2022-05-23T10:27:20.0136861Z May 23 10:27:20 at > org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:62) > 2022-05-23T10:27:20.0138163Z May 23 10:27:20 at > org.apache.flink.runtime.client.ClientUtilsTest.uploadAndSetUserArtifacts(ClientUtilsTest.java:137) > 2022-05-23T10:27:20.0139618Z May 23 10:27:20 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-05-23T10:27:20.0140639Z May 23 10:27:20 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-05-23T10:27:20.0142022Z May 23 10:27:20 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-05-23T10:27:20.0144222Z May 23 10:27:20 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-05-23T10:27:20.0145368Z May 23 10:27:20 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-05-23T10:27:20.0146856Z May 23 10:27:20 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-05-23T10:27:20.0147934Z May 23 10:27:20 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-05-23T10:27:20.0148815Z May 23 10:27:20 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-05-23T10:27:20.0149537Z May 23 10:27:20 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-05-23T10:27:20.0150204Z May 23 10:27:20 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-05-23T10:27:20.0150848Z May 23 10:27:20 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-05-23T10:27:20.0151599Z May 23 10:27:20 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2022-05-23T10:27:20.0152293Z May 23 10:27:20 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2022-05-23T10:27:20.0153073Z May 23 10:27:20 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2022-05-23T10:27:20.0153876Z May 23 10:27:20 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2022-05-23T10:27:20.0154555Z May 23 10:27:20 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2022-05-23T10:27:20.0155189Z May 23 10:27:20 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > 2022-05-23T10:27:20.0155846Z May 23 10:27:20 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > 2022-05-23T10:27:20.0156708Z May 23 10:27:20 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > 2022-05-23T10:27:20.0157380Z May 23 10:27:20 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > 2022-05-23T10:27:20.0158056Z May 23 10:27:20 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2022-05-23T10:27:20.0158760Z May 23 10:27:20 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2022-05-23T10:27:20.0159493Z May 23 10:27:20 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-05-23T10:27:20.0160124Z May 23 10:27:20 at > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2022-05-23T10:27:20.0160740Z May 23 10:27:20 at >
[jira] [Updated] (FLINK-28215) Bump Maven Surefire plugin to 3.0.0-M7
[ https://issues.apache.org/jira/browse/FLINK-28215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28215: --- Labels: pull-request-available (was: ) > Bump Maven Surefire plugin to 3.0.0-M7 > -- > > Key: FLINK-28215 > URL: https://issues.apache.org/jira/browse/FLINK-28215 > Project: Flink > Issue Type: Technical Debt > Components: Build System >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] MartijnVisser opened a new pull request, #20055: [FLINK-28215][Buildsystem] Update Maven Surefire plugin to 3.0.0-M7
MartijnVisser opened a new pull request, #20055: URL: https://github.com/apache/flink/pull/20055 ## What is the purpose of the change * Update Maven Surefire plugin to latest version ## Brief change log * Bumped dependency in main `pom.xml` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28215) Bump Maven Surefire plugin to 3.0.0-M7
Martijn Visser created FLINK-28215: -- Summary: Bump Maven Surefire plugin to 3.0.0-M7 Key: FLINK-28215 URL: https://issues.apache.org/jira/browse/FLINK-28215 Project: Flink Issue Type: Technical Debt Components: Build System Reporter: Martijn Visser Assignee: Martijn Visser -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28187) Duplicate job submission for FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-28187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557824#comment-17557824 ] Aitozi commented on FLINK-28187: > For sessionjobs we need to cover both using the jobid magic somehow Can this done by generating JobID with the resource UID ? 1. Dispatcher will throw DuplicateJobSubmissionException if the same JobID submitted twice. 2. Upgrade happens with the following steps: 1) suspend the old job, reconcile status to upgrading 2) submit the job with new spec, same jobId 3) If job submitted succeed, but somehow throws timeout, then observer can detect the JobID has running , then update the reconcile status to deployed and update the lastReconciledSpec Do you think this is a valid solution? [~gyfora] > Duplicate job submission for FlinkSessionJob > > > Key: FLINK-28187 > URL: https://issues.apache.org/jira/browse/FLINK-28187 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: Jeesmon Jacob >Priority: Critical > Attachments: flink-operator-log.txt > > > During a session job submission if a deployment error (ex: > concurrent.TimeoutException) is hit, operator will submit the job again. But > first submission could have succeeded in jobManager side and second > submission could result in duplicate job. Operator log attached. > Per [~gyfora]: > The problem is that in case a deployment error was hit, the > SessionJobObserver will not be able to tell whether it has submitted the job > or not. So it will simply try to submit it again. We have to find a mechanism > to correlate Jobs on the cluster with the SessionJob CR itself. Maybe we > could override the job name itself for this purpose or something like that. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28214) ArrayDataSerializer can not be reused to copy customized type of array data
[ https://issues.apache.org/jira/browse/FLINK-28214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557819#comment-17557819 ] Yi Tang commented on FLINK-28214: - [~gyfora] Would you mind checking this issue? > ArrayDataSerializer can not be reused to copy customized type of array data > > > Key: FLINK-28214 > URL: https://issues.apache.org/jira/browse/FLINK-28214 > Project: Flink > Issue Type: Improvement >Reporter: Yi Tang >Priority: Minor > > In FLINK-25238, we fix the ArrayDataSerializer to support copying customized > type of array data with similar way in MapDataSerializer. > The MapDataSerializer#toBinaryMap always contains copy semantics implicitly > but ArrayDataSerializer#toBinaryArray not. > So the returned value of ArrayDataSerializer#toBinaryArray will be covered by > new copied data. > We should always copy from the returned value of > ArrayDataSerializer#toBinaryArray in ArrayDataSerializer#copy explicitly. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] fsk119 commented on a diff in pull request #19849: [FLINK-27767][sql-gateway] Introduce Endpoint API and utils
fsk119 commented on code in PR #19849: URL: https://github.com/apache/flink/pull/19849#discussion_r904573975 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerTest.java: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.session; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.cli.DefaultCLI; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions; +import org.apache.flink.table.gateway.common.session.SessionEnvironment; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.common.utils.MockedEndpointVersion; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.DefaultContext; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** Test for {@link SessionManager}. */ +public class SessionManagerTest { Review Comment: Thanks for your suggestions. I use Junit5 in the current implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28214) ArrayDataSerializer can not be reused to copy customized type of array data
Yi Tang created FLINK-28214: --- Summary: ArrayDataSerializer can not be reused to copy customized type of array data Key: FLINK-28214 URL: https://issues.apache.org/jira/browse/FLINK-28214 Project: Flink Issue Type: Improvement Reporter: Yi Tang In FLINK-25238, we fix the ArrayDataSerializer to support copying customized type of array data with similar way in MapDataSerializer. The MapDataSerializer#toBinaryMap always contains copy semantics implicitly but ArrayDataSerializer#toBinaryArray not. So the returned value of ArrayDataSerializer#toBinaryArray will be covered by new copied data. We should always copy from the returned value of ArrayDataSerializer#toBinaryArray in ArrayDataSerializer#copy explicitly. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28187) Duplicate job submission for FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-28187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557814#comment-17557814 ] Gyula Fora commented on FLINK-28187: Thanks for the comment. Identifying failed first deployments is slightly tricky I agree but this doesn't really affect the general requirement: 1. Have a way to detect in the FlinkService if a job for this resource is already running (throw an error) -> never allow double submission 2. Have a way to detect in the Observer if an upgrade already happened and update the lastReconciledSpec accordingly For Deployments 1) is provided by Flink itself, 2) is basically covered in the commit I sent. For sessionjobs we need to cover both using the jobid magic somehow :) > Duplicate job submission for FlinkSessionJob > > > Key: FLINK-28187 > URL: https://issues.apache.org/jira/browse/FLINK-28187 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: Jeesmon Jacob >Priority: Critical > Attachments: flink-operator-log.txt > > > During a session job submission if a deployment error (ex: > concurrent.TimeoutException) is hit, operator will submit the job again. But > first submission could have succeeded in jobManager side and second > submission could result in duplicate job. Operator log attached. > Per [~gyfora]: > The problem is that in case a deployment error was hit, the > SessionJobObserver will not be able to tell whether it has submitted the job > or not. So it will simply try to submit it again. We have to find a mechanism > to correlate Jobs on the cluster with the SessionJob CR itself. Maybe we > could override the job name itself for this purpose or something like that. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] dianfu commented on a diff in pull request #20029: [FLINK-27392][cep] CEP Pattern supports definition of the maximum time gap between events
dianfu commented on code in PR #20029: URL: https://github.com/apache/flink/pull/20029#discussion_r904446329 ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ## @@ -579,6 +662,16 @@ private void checkIfPreviousPatternGreedy() { } } +private void checkWindowTimeBetweenEvents(Time windowTime, WithinType withinType) { +if (WithinType.PREVIOUS_AND_CURRENT.equals(withinType) +&& windowTimes.containsKey(WithinType.FIRST_AND_LAST) +&& windowTime.toMilliseconds() +> windowTimes.get(WithinType.FIRST_AND_LAST).toMilliseconds()) { +throw new MalformedPatternException( +"Window length between the previous and current event cannot be larger than which between the first and last event for pattern."); Review Comment: ```suggestion "Window length between the previous and current event cannot be larger than the window length between the first and last event for pattern."); ``` ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ## @@ -348,10 +370,28 @@ public Pattern optional() { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern oneOrMore() { +return oneOrMore(null); +} + +/** + * Specifies that this pattern can occur {@code one or more} times and time interval corresponds + * to the maximum time gap between previous and current event for each times. This means at + * least one and at most infinite number of events can be matched to this pattern. + * + * If this quantifier is enabled for a pattern {@code A.oneOrMore().followedBy(B)} and a + * sequence of events {@code A1 A2 B} appears, this will generate patterns: {@code A1 B} and + * {@code A1 A2 B}. See also {@link #allowCombinations()}. + * + * @param windowTimes mapping between times and time of the matching window. + * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier + * applied. + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ +public Pattern oneOrMore(Map windowTimes) { Review Comment: I guess *oneOrMore(Time windowTime)* is enough. ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ## @@ -394,14 +447,28 @@ public Pattern times(int times) { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern times(int from, int to) { +return times(from, to, null); +} + +/** + * Specifies that the pattern can occur between from and to times with time interval corresponds + * to the maximum time gap between previous and current event for each times. + * + * @param from number of times matching event must appear at least + * @param to number of times matching event must appear at most + * @param windowTimes mapping between times and time of the matching window. + * @return The same pattern with the number of times range applied + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ +public Pattern times(int from, int to, Map windowTimes) { Review Comment: ditto ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ## @@ -174,6 +182,7 @@ void compileFactory() { if (lastPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW +&& !windowTimes.containsKey(lastPattern.getName()) Review Comment: && (!windowTimes.containsKey(lastPattern.getName()) || windowTimes.get(lastPattern.getName()) <= 0) ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java: ## @@ -613,6 +643,7 @@ private Collection computeNextStates( int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches(); int totalTakeToSkip = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1); +final long stateTimestamp = event.getTimestamp(); Review Comment: Move it into `case TAKE`? ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java: ## @@ -41,6 +41,9 @@ public class ComputationState { // Timestamp of the first element in the pattern private final long startTimestamp; +// Timestamp of the previous element in the state +private final long stateTimestamp; Review Comment: ```suggestion private final long previousTimestamp; ``` ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java: ## @@ -91,6 +92,13 @@ */ private final Map>
[jira] [Commented] (FLINK-28210) FlinkSessionJob fails after FlinkDeployment is updated
[ https://issues.apache.org/jira/browse/FLINK-28210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557800#comment-17557800 ] Gyula Fora commented on FLINK-28210: This is expected if HA is not configured for the session FlinkDeploymemt. Can you share your session yaml? > FlinkSessionJob fails after FlinkDeployment is updated > -- > > Key: FLINK-28210 > URL: https://issues.apache.org/jira/browse/FLINK-28210 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 > Environment: The [quick > start|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/] > was followed to install minikube and the flink operator. > > minikube 1.24.1 > kubectl 1.24.2 > flink operator: 1.0.0 >Reporter: Daniel Crowe >Priority: Major > > I created a flink deployment using this example: > {code} > curl > https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-session-job.yaml > -o basic-session-job.yaml > kubectl create -f basic-session-job.yaml > {code} > Then, I modified the memory allocated to the jobManager and applied the change > {code} > kubectl apply -f basic-session-job.yaml > {code} > The job manager is restarted to apply the change, but the jobs are not. > Looking at the operator logs, it appears that something is failing during job > status observation: > {noformat} > 2022-06-23 03:29:51,189 o.a.f.k.o.c.FlinkSessionJobController [INFO > ][default/basic-session-job-example2] Starting reconciliation > 2022-06-23 03:29:51,190 o.a.f.k.o.o.JobStatusObserver [INFO > ][default/basic-session-job-example2] Observing job status > 2022-06-23 03:29:51,205 o.a.f.k.o.c.FlinkSessionJobController [INFO > ][default/basic-session-job-example] Starting reconciliation > 2022-06-23 03:29:51,206 o.a.f.k.o.o.JobStatusObserver [INFO > ][default/basic-session-job-example] Observing job status > 2022-06-23 03:29:51,208 o.a.f.k.o.c.FlinkDeploymentController [INFO > ][default/basic-session-cluster] Starting reconciliation > 2022-06-23 03:29:51,227 o.a.f.k.o.c.FlinkDeploymentController [INFO > ][default/basic-session-cluster] End of reconciliation > {noformat} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-ml] zhipeng93 commented on pull request #114: [FLINK-27096] Optimize VectorAssembler performance
zhipeng93 commented on PR #114: URL: https://github.com/apache/flink-ml/pull/114#issuecomment-1163919452 @lindong28 @yunfengzhou-hub Thanks for the comments. I have addressed the comments in the latest push -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #114: [FLINK-27096] Optimize VectorAssembler performance
zhipeng93 commented on code in PR #114: URL: https://github.com/apache/flink-ml/pull/114#discussion_r904510367 ## flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java: ## @@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) { return new Table[] {outputTable}; } -private static class AssemblerFunc implements FlatMapFunction { +private static class AssemblerFunc extends RichFlatMapFunction { private final String[] inputCols; private final String handleInvalid; +/** The indices for assembling vectors. */ +private transient IntArrayList indices; +/** The values for assembling vectors. */ +private transient DoubleArrayList values; Review Comment: Marked resolved since I removed the use of `it.unimi.dsi.fastutil.*`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #114: [FLINK-27096] Optimize VectorAssembler performance
zhipeng93 commented on code in PR #114: URL: https://github.com/apache/flink-ml/pull/114#discussion_r904510188 ## flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java: ## @@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) { return new Table[] {outputTable}; } -private static class AssemblerFunc implements FlatMapFunction { +private static class AssemblerFunc extends RichFlatMapFunction { private final String[] inputCols; private final String handleInvalid; +/** The indices for assembling vectors. */ +private transient IntArrayList indices; +/** The values for assembling vectors. */ +private transient DoubleArrayList values; + public AssemblerFunc(String[] inputCols, String handleInvalid) { this.inputCols = inputCols; this.handleInvalid = handleInvalid; } @Override -public void flatMap(Row value, Collector out) throws Exception { +public void open(Configuration parameters) throws Exception { +super.open(parameters); +indices = new IntArrayList(); +values = new DoubleArrayList(); +} + +@Override +public void flatMap(Row value, Collector out) { +int offset = 0; try { -Object[] objects = new Object[inputCols.length]; -for (int i = 0; i < objects.length; ++i) { -objects[i] = value.getField(inputCols[i]); +for (String inputCol : inputCols) { +Object object = value.getField(inputCol); +Preconditions.checkNotNull(object, "Input column value should not be null."); +if (object instanceof Number) { +indices.add(offset++); +values.add(((Number) object).doubleValue()); +} else if (object instanceof SparseVector) { +SparseVector sparseVector = (SparseVector) object; +for (int i = 0; i < sparseVector.indices.length; ++i) { +indices.add(sparseVector.indices[i] + offset); +values.add(sparseVector.values[i]); +} +offset += sparseVector.size(); +} else if (object instanceof DenseVector) { +DenseVector denseVector = (DenseVector) object; +for (int i = 0; i < denseVector.size(); ++i) { +indices.add(offset + i); +values.add(denseVector.values[i]); +} +offset += denseVector.size(); +} else { +throw new IllegalArgumentException( +"Input type has not been supported yet."); +} +} + +Vector assembledVec = +new SparseVector( Review Comment: Marked resolved since I removed the use of `it.unimi.dsi.fastutil.*`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #114: [FLINK-27096] Optimize VectorAssembler performance
zhipeng93 commented on code in PR #114: URL: https://github.com/apache/flink-ml/pull/114#discussion_r904509535 ## flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java: ## @@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) { return new Table[] {outputTable}; } -private static class AssemblerFunc implements FlatMapFunction { +private static class AssemblerFunc extends RichFlatMapFunction { private final String[] inputCols; private final String handleInvalid; +/** The indices for assembling vectors. */ +private transient IntArrayList indices; Review Comment: Thanks for the comment. I have removed the usage of `IntArrayList` and re-implemented in two loops. There is no performance regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28212) IndexOutOfBoundsException is thrown when project contains window which dosen't refer all fields of input when using Hive dialect
[ https://issues.apache.org/jira/browse/FLINK-28212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-28212: - Description: Can be reproduced by following sql {code:java} CREATE TABLE alltypesorc( ctinyint TINYINT, csmallint SMALLINT, cint INT, cbigint BIGINT, cfloat FLOAT, cdouble DOUBLE, cstring1 STRING, cstring2 STRING, ctimestamp1 TIMESTAMP, ctimestamp2 TIMESTAMP, cboolean1 BOOLEAN, cboolean2 BOOLEAN); select a.ctinyint, a.cint, count(a.cdouble) over(partition by a.ctinyint order by a.cint desc rows between 1 preceding and 1 following) from alltypesorc {code} Then it will throw the exception "caused by: java.lang.IndexOutOfBoundsException: index (7) must be less than size (1)". The reson is for such sql, Hive dialect will generate a RelNode: {code:java} LogicalSink(table=[*anonymous_collect$1*], fields=[ctinyint, cint, _o__c2]) LogicalProject(ctinyint=[$0], cint=[$2], _o__c2=[$12]) LogicalProject(ctinyint=[$0], csmallint=[$1], cint=[$2], cbigint=[$3], cfloat=[$4], cdouble=[$5], cstring1=[$6], cstring2=[$7], ctimestamp1=[$8], ctimestamp2=[$9], cboolean1=[$10], cboolean2=[$11], _o__col13=[COUNT($5) OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)]) LogicalTableScan(table=[[test-catalog, default, alltypesorc]]) {code} Note: the first ProjectNode from down to top conatins all fields. And as the "{*}1{*} PRECEDING AND *1* FOLLOWING" in windows will be converted to field access. So, the window will be like {code:java} COUNT($5) OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST ROWS BETWEEN $12 PRECEDING AND $12 FOLLOWING{code} But the in rule "ProjectWindowTransposeRule", the uncesscassy field(not refered by the top project and window) will be removed, so the the input of the window will only contains 3 fields (ctinyint, cint, cdouble). Finally, in RelExplainUtil, when explain boundString, it won't find {*}$12{*}, so the exception throws. {code:java} val ref = bound.getOffset.asInstanceOf[RexInputRef] // ref.getIndex will be 12, but input size of the window is 3 val boundIndex = ref.getIndex - calcOriginInputRows(window) // the window's constants only contains one single element "1" val offset = window.constants.get(boundIndex).getValue2 val offsetKind = if (bound.isPreceding) "PRECEDING" else "FOLLOWING" s"$offset $offsetKind" {code} was: Can be reproduced by following sql {code:java} CREATE TABLE alltypesorc( ctinyint TINYINT, csmallint SMALLINT, cint INT, cbigint BIGINT, cfloat FLOAT, cdouble DOUBLE, cstring1 STRING, cstring2 STRING, ctimestamp1 TIMESTAMP, ctimestamp2 TIMESTAMP, cboolean1 BOOLEAN, cboolean2 BOOLEAN); select a.ctinyint, a.cint, count(a.cdouble) over(partition by a.ctinyint order by a.cint desc rows between 1 preceding and 1 following) from alltypesorc {code} Then it will throw Caused by: java.lang.IndexOutOfBoundsException: index (7) must be less than size (1) > IndexOutOfBoundsException is thrown when project contains window which > dosen't refer all fields of input when using Hive dialect > > > Key: FLINK-28212 > URL: https://issues.apache.org/jira/browse/FLINK-28212 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Reporter: luoyuxia >Priority: Major > Fix For: 1.16.0 > > > Can be reproduced by following sql > {code:java} > CREATE TABLE alltypesorc( > ctinyint TINYINT, > csmallint SMALLINT, > cint INT, > cbigint BIGINT, > cfloat FLOAT, > cdouble DOUBLE, > cstring1 STRING, > cstring2 STRING, > ctimestamp1 TIMESTAMP, > ctimestamp2 TIMESTAMP, > cboolean1 BOOLEAN, > cboolean2 BOOLEAN); > select a.ctinyint, a.cint, count(a.cdouble) > over(partition by a.ctinyint
[jira] [Created] (FLINK-28213) StreamExecutionEnvironment configure method support override pipeline.jars option
dalongliu created FLINK-28213: - Summary: StreamExecutionEnvironment configure method support override pipeline.jars option Key: FLINK-28213 URL: https://issues.apache.org/jira/browse/FLINK-28213 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Affects Versions: 1.16.0 Reporter: dalongliu Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28212) IndexOutOfBoundsException is thrown when project contains window which dosen't refer all fields of input when using Hive dialect
luoyuxia created FLINK-28212: Summary: IndexOutOfBoundsException is thrown when project contains window which dosen't refer all fields of input when using Hive dialect Key: FLINK-28212 URL: https://issues.apache.org/jira/browse/FLINK-28212 Project: Flink Issue Type: Bug Components: Connectors / Hive Reporter: luoyuxia Fix For: 1.16.0 Can be reproduced by following sql {code:java} CREATE TABLE alltypesorc( ctinyint TINYINT, csmallint SMALLINT, cint INT, cbigint BIGINT, cfloat FLOAT, cdouble DOUBLE, cstring1 STRING, cstring2 STRING, ctimestamp1 TIMESTAMP, ctimestamp2 TIMESTAMP, cboolean1 BOOLEAN, cboolean2 BOOLEAN); select a.ctinyint, a.cint, count(a.cdouble) over(partition by a.ctinyint order by a.cint desc rows between 1 preceding and 1 following) from alltypesorc {code} Then it will throw Caused by: java.lang.IndexOutOfBoundsException: index (7) must be less than size (1) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27963) FlinkRuntimeException in KafkaSink causes a Flink job to hang
[ https://issues.apache.org/jira/browse/FLINK-27963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557795#comment-17557795 ] Qingsheng Ren commented on FLINK-27963: --- It will take some time to get this feature released. As this touches the public API it requires discussion in the mailling list, and FLIP if we need to implement this as a common feature in Sink API. [~igaevd] Do you have any interest to work on this? > FlinkRuntimeException in KafkaSink causes a Flink job to hang > - > > Key: FLINK-27963 > URL: https://issues.apache.org/jira/browse/FLINK-27963 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.0, 1.14.4 >Reporter: Dmytro >Priority: Major > Labels: FlinkRuntimeException, KafkaSink > > If FlinkRuntimeException occurs in the > [KafkaSink|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#kafka-sink] > then the Flink job tries to re-send failed data again and gets into endless > loop "exception->send again" > *Code sample which throws the FlinkRuntimeException:* > {code:java} > int numberOfRows = 1; > int rowsPerSecond = 1; > DataStream stream = environment.addSource( > new DataGeneratorSource<>( > RandomGenerator.stringGenerator(105), // > max.message.bytes=1048588 > rowsPerSecond, > (long) numberOfRows), > TypeInformation.of(String.class)) > .setParallelism(1) > .name("string-generator"); > KafkaSinkBuilder builder = KafkaSink.builder() > .setBootstrapServers("localhost:9092") > .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .setRecordSerializer( > > KafkaRecordSerializationSchema.builder().setTopic("test.output") > .setValueSerializationSchema(new > SimpleStringSchema()) > .build()); > KafkaSink sink = builder.build(); > stream.sinkTo(sink).setParallelism(1).name("output-producer"); {code} > *Exception Stack Trace:* > {code:java} > 2022-06-02/14:01:45.066/PDT [flink-akka.actor.default-dispatcher-4] INFO > output-producer: Writer -> output-producer: Committer (1/1) > (a66beca5a05c1c27691f7b94ca6ac025) switched from RUNNING to FAILED on > 271b1b90-7d6b-4a34-8116-3de6faa8a9bf @ 127.0.0.1 (dataPort=-1). > org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka > null with FlinkKafkaInternalProducer{transactionalId='null', > inTransaction=false, closed=false} at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:440) > ~[flink-connector-kafka-1.15.0.jar:1.15.0] at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:421) > ~[flink-connector-kafka-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:748) > ~[?:1.8.0_292] Caused by: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1050088 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. {code} > ** -- This message was sent by Atlassian Jira
[jira] [Closed] (FLINK-28211) Rename Schema to TableSchema
[ https://issues.apache.org/jira/browse/FLINK-28211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-28211. Resolution: Fixed master: 5a34efce8fadf96fc05194b28f62f0680a5afa62 > Rename Schema to TableSchema > > > Key: FLINK-28211 > URL: https://issues.apache.org/jira/browse/FLINK-28211 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.2.0 > > > There are some systems that use schema as a concept of database, so the > Schema class will be very confuse in this case, it is better to rename it as > TableSchema. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-table-store] JingsongLi closed pull request #172: [FLINK-28211] Rename Schema to TableSchema
JingsongLi closed pull request #172: [FLINK-28211] Rename Schema to TableSchema URL: https://github.com/apache/flink-table-store/pull/172 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28211) Rename Schema to TableSchema
[ https://issues.apache.org/jira/browse/FLINK-28211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28211: --- Labels: pull-request-available (was: ) > Rename Schema to TableSchema > > > Key: FLINK-28211 > URL: https://issues.apache.org/jira/browse/FLINK-28211 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.2.0 > > > There are some systems that use schema as a concept of database, so the > Schema class will be very confuse in this case, it is better to rename it as > TableSchema. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-table-store] JingsongLi opened a new pull request, #172: [FLINK-28211] Rename Schema to TableSchema
JingsongLi opened a new pull request, #172: URL: https://github.com/apache/flink-table-store/pull/172 There are some systems that use schema as a concept of database, so the Schema class will be very confuse in this case, it is better to rename it as TableSchema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-28126) Iteration gets stuck when replayable datastream and its downstream operator have different parallelism
[ https://issues.apache.org/jira/browse/FLINK-28126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao reassigned FLINK-28126: --- Assignee: Yun Gao > Iteration gets stuck when replayable datastream and its downstream operator > have different parallelism > -- > > Key: FLINK-28126 > URL: https://issues.apache.org/jira/browse/FLINK-28126 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.0.0 >Reporter: Xuannan Su >Assignee: Yun Gao >Priority: Major > > Iteration gets stuck when replayable datastream and its downstream operator > have different parallelism. It can be reproduced with the following code > snippet. > {code:java} > @Test > public void testIteration() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > final SingleOutputStreamOperator variable = > env.fromElements(0).name("i"); > final SingleOutputStreamOperator data = env.fromElements(1, > 2).name("inc") > .map(x -> x).setParallelism(1); // test can pass if > parallelism is 2. > final IterationConfig config = IterationConfig.newBuilder().build(); > Iterations.iterateBoundedStreamsUntilTermination( > DataStreamList.of(variable), > ReplayableDataStreamList.replay(data), > config, > (IterationBody) (variableStreams, dataStreams) -> { > final DataStream sample = dataStreams.get(0); > final SingleOutputStreamOperator trainOutput = > sample > .transform( > "iter", > TypeInformation.of(Integer.class), > new IterTransform()) > .setParallelism(2) > .map((MapFunction) > integer -> integer) > .setParallelism(1); > return new IterationBodyResult( > DataStreamList.of(trainOutput), > DataStreamList.of(trainOutput)); > }); > env.execute(); > } > public static class IterTransform extends AbstractStreamOperator > implements OneInputStreamOperator, > IterationListener { > @Override > public void processElement(StreamRecord element) throws > Exception { > LOG.info("Processing element: {}", element); > } > @Override > public void onEpochWatermarkIncremented( > int epochWatermark, Context context, Collector > collector) > throws Exception { > LOG.info("onEpochWatermarkIncremented: {}", epochWatermark); > if (epochWatermark >= 10) { > return; > } > collector.collect(0); > } > @Override > public void onIterationTerminated(Context context, Collector > collector) > throws Exception { > LOG.info("onIterationTerminated"); > } > } > {code} > After digging into the code, I found that the `ReplayOperator` doesn't emit > the epoch watermark with a broadcast output. [~gaoyunhaii], could you look to > see if this is the case? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28211) Rename Schema to TableSchema
Jingsong Lee created FLINK-28211: Summary: Rename Schema to TableSchema Key: FLINK-28211 URL: https://issues.apache.org/jira/browse/FLINK-28211 Project: Flink Issue Type: Improvement Components: Table Store Reporter: Jingsong Lee Assignee: Jingsong Lee Fix For: table-store-0.2.0 There are some systems that use schema as a concept of database, so the Schema class will be very confuse in this case, it is better to rename it as TableSchema. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28126) Iteration gets stuck when replayable datastream and its downstream operator have different parallelism
[ https://issues.apache.org/jira/browse/FLINK-28126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557792#comment-17557792 ] Yun Gao commented on FLINK-28126: - Very thanks [~xuannan] for the investigation, I'll have a look~ > Iteration gets stuck when replayable datastream and its downstream operator > have different parallelism > -- > > Key: FLINK-28126 > URL: https://issues.apache.org/jira/browse/FLINK-28126 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning >Affects Versions: ml-2.0.0 >Reporter: Xuannan Su >Assignee: Yun Gao >Priority: Major > > Iteration gets stuck when replayable datastream and its downstream operator > have different parallelism. It can be reproduced with the following code > snippet. > {code:java} > @Test > public void testIteration() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > final SingleOutputStreamOperator variable = > env.fromElements(0).name("i"); > final SingleOutputStreamOperator data = env.fromElements(1, > 2).name("inc") > .map(x -> x).setParallelism(1); // test can pass if > parallelism is 2. > final IterationConfig config = IterationConfig.newBuilder().build(); > Iterations.iterateBoundedStreamsUntilTermination( > DataStreamList.of(variable), > ReplayableDataStreamList.replay(data), > config, > (IterationBody) (variableStreams, dataStreams) -> { > final DataStream sample = dataStreams.get(0); > final SingleOutputStreamOperator trainOutput = > sample > .transform( > "iter", > TypeInformation.of(Integer.class), > new IterTransform()) > .setParallelism(2) > .map((MapFunction) > integer -> integer) > .setParallelism(1); > return new IterationBodyResult( > DataStreamList.of(trainOutput), > DataStreamList.of(trainOutput)); > }); > env.execute(); > } > public static class IterTransform extends AbstractStreamOperator > implements OneInputStreamOperator, > IterationListener { > @Override > public void processElement(StreamRecord element) throws > Exception { > LOG.info("Processing element: {}", element); > } > @Override > public void onEpochWatermarkIncremented( > int epochWatermark, Context context, Collector > collector) > throws Exception { > LOG.info("onEpochWatermarkIncremented: {}", epochWatermark); > if (epochWatermark >= 10) { > return; > } > collector.collect(0); > } > @Override > public void onIterationTerminated(Context context, Collector > collector) > throws Exception { > LOG.info("onIterationTerminated"); > } > } > {code} > After digging into the code, I found that the `ReplayOperator` doesn't emit > the epoch watermark with a broadcast output. [~gaoyunhaii], could you look to > see if this is the case? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] flinkbot commented on pull request #20054: [FLINK-28135][runtime] Introduce SlowTaskDetector
flinkbot commented on PR #20054: URL: https://github.com/apache/flink/pull/20054#issuecomment-1163888460 ## CI report: * 99cadbcf9a3e396ce05525feeb53f6fea17943cf UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28135) Introduce SlowTaskDetector
[ https://issues.apache.org/jira/browse/FLINK-28135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28135: --- Labels: pull-request-available (was: ) > Introduce SlowTaskDetector > -- > > Key: FLINK-28135 > URL: https://issues.apache.org/jira/browse/FLINK-28135 > Project: Flink > Issue Type: Sub-task >Reporter: Zhu Zhu >Assignee: wangwj >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > A SlowTaskDetector will periodically check all the current tasks/executions > and notify the SlowTaskDetectorListener about the detected slow tasks. > SpeculativeScheduler will register itself as the SlowTaskDetectorListener. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] 6591812 opened a new pull request, #20054: [FLINK-28135][runtime] Introduce SlowTaskDetector
6591812 opened a new pull request, #20054: URL: https://github.com/apache/flink/pull/20054 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-28210) FlinkSessionJob fails after FlinkDeployment is updated
Daniel Crowe created FLINK-28210: Summary: FlinkSessionJob fails after FlinkDeployment is updated Key: FLINK-28210 URL: https://issues.apache.org/jira/browse/FLINK-28210 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.0.0 Environment: The [quick start|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/] was followed to install minikube and the flink operator. minikube 1.24.1 kubectl 1.24.2 flink operator: 1.0.0 Reporter: Daniel Crowe I created a flink deployment using this example: {code} curl https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-session-job.yaml -o basic-session-job.yaml kubectl create -f basic-session-job.yaml {code} Then, I modified the memory allocated to the jobManager and applied the change {code} kubectl apply -f basic-session-job.yaml {code} The job manager is restarted to apply the change, but the jobs are not. Looking at the operator logs, it appears that something is failing during job status observation: {noformat} 2022-06-23 03:29:51,189 o.a.f.k.o.c.FlinkSessionJobController [INFO ][default/basic-session-job-example2] Starting reconciliation 2022-06-23 03:29:51,190 o.a.f.k.o.o.JobStatusObserver [INFO ][default/basic-session-job-example2] Observing job status 2022-06-23 03:29:51,205 o.a.f.k.o.c.FlinkSessionJobController [INFO ][default/basic-session-job-example] Starting reconciliation 2022-06-23 03:29:51,206 o.a.f.k.o.o.JobStatusObserver [INFO ][default/basic-session-job-example] Observing job status 2022-06-23 03:29:51,208 o.a.f.k.o.c.FlinkDeploymentController [INFO ][default/basic-session-cluster] Starting reconciliation 2022-06-23 03:29:51,227 o.a.f.k.o.c.FlinkDeploymentController [INFO ][default/basic-session-cluster] End of reconciliation {noformat} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28209) KafkaSink with EXACTLY_ONCE produce reduplicate data(flink kafka connector1.14.4)
[ https://issues.apache.org/jira/browse/FLINK-28209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanyao updated FLINK-28209: --- Description: I'm trying to read mysql binlog and transport it to kafka; here is what i'm using : *Flink: 1.14.4* *Flink-CDC : 2.2* *Kafka: CDH6.2(2.1)* *Stage-1:* mysql-cdc-connector was used to consume mysql binlog data . about 40W rows changed when i executed some sql in mysql, and i can get those 40W rows without any data lose or reduplicate, just the some number as mysql changed . So, i don't think cdc is the problem. Stage-2: when i got binlog data, first i deserialized it to type of Tuple2, which tuple2.f0 has the format "db.table" and i intend to use it as kafka topic for every different db.table, tuple2.f1 contains binlog value only. *Stage-3:* then, i used KafkaSink (which was introduced in flink 1.14) to write binlog to different kafka topic as tuple2.f0 indicated. Here is the code like : !image-2022-06-23-10-49-01-213.png! As u can see, I just want to use EXACTLY_ONCE semantics,but here is the problem: after about 10mins waiting for all binlog consumed, i checked all data in a single kafka topic (just one topic ), the total number of rows is bigger than the number of binlog rows from mysql data changed, because too many reduplicated data sink to kafka. For example !image-2022-06-23-10-58-15-141.png! Stage-4: however, when i changed EXACTLY_ONCE. to. AT_LEAST_ONCE, everything worked very well, no more reduplicated data in kafka. So i'm wonderring , is there any bug in KafkaSink when EXACTLY_ONCE is configured. Can anybody help ? Hope for your answer sincerely. was: I'm trying to read mysql binlog and transport it to kafka; here is what i'm using : *Flink: 1.14.4* *Flink-CDC : 2.2* *Kafka: CDH6.2(2.1)* *Stage-1:* mysql-cdc-connector was used to consume mysql binlog data . about 40W rows changed when i executed some sql in mysql, and i can get those 40W rows without any data lose or reduplicate, just the some number as mysql changed . So, i don't think cdc is the problem. Stage-2: when i got binlog data, first i deserialized it to type of Tuple2, which tuple2.f0 has the format "db.table" and i intend to use it as kafka topic for every different db.table, tuple2.f1 contains binlog value only. *Stage-3:* then, i used KafkaSink (which was introduced in flink 1.14) to write binlog to different kafka topic as tuple2.f0 indicated. Here is the code like : !image-2022-06-23-10-49-01-213.png! As u can see, I just want to use EXACTLY_ONCE semantics,but here is the problem: after about 10mins waiting for all binlog consumed, i checked all data in a single kafka topic (just one topic ), the total number of rows is bigger than the number of binlog rows from mysql data changed, because too many reduplicated data sink to kafka. For example !image-2022-06-23-10-58-15-141.png! Stage-4: however, when i changed EXACTLY_ONCE. to. AT_LEAST_ONCE, everything worked very well, no more reduplicated data in kafka. So i'm wonderring , is there any bug in KafkaSink when EXACTLY_ONCE is configured. > KafkaSink with EXACTLY_ONCE produce reduplicate data(flink kafka > connector1.14.4) > -- > > Key: FLINK-28209 > URL: https://issues.apache.org/jira/browse/FLINK-28209 > Project: Flink > Issue Type: Bug >Affects Versions: 1.14.4 >Reporter: tanyao >Priority: Major > Attachments: image-2022-06-23-10-49-01-213.png, > image-2022-06-23-10-58-15-141.png > > > I'm trying to read mysql binlog and transport it to kafka; > here is what i'm using : > *Flink: 1.14.4* > *Flink-CDC : 2.2* > *Kafka: CDH6.2(2.1)* > > *Stage-1:* > mysql-cdc-connector was used to consume mysql binlog data . about 40W rows > changed when i executed some sql in mysql, and i can get those 40W rows > without any data lose or reduplicate, just the some number as mysql changed . > So, i don't think cdc is the problem. > > Stage-2: > when i got binlog data, first i deserialized it to type of > Tuple2, which tuple2.f0 has the format "db.table" and i > intend to use it as kafka topic for every different db.table, tuple2.f1 > contains binlog value only. > > *Stage-3:* > then, i used KafkaSink (which was introduced in flink 1.14) to write binlog > to different kafka topic as tuple2.f0 indicated. > Here is the code like : > !image-2022-06-23-10-49-01-213.png! > > As u can see, I just want to use EXACTLY_ONCE semantics,but here is the > problem: > after about 10mins waiting for all binlog consumed, i checked all data in a > single kafka topic (just one topic ), the total number of rows is bigger > than the number of binlog rows from mysql data changed, because too many
[jira] [Updated] (FLINK-28209) KafkaSink with EXACTLY_ONCE produce reduplicate data(flink kafka connector1.14.4)
[ https://issues.apache.org/jira/browse/FLINK-28209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tanyao updated FLINK-28209: --- Description: I'm trying to read mysql binlog and transport it to kafka; here is what i'm using : *Flink: 1.14.4* *Flink-CDC : 2.2* *Kafka: CDH6.2(2.1)* *Stage-1:* mysql-cdc-connector was used to consume mysql binlog data . about 40W rows changed when i executed some sql in mysql, and i can get those 40W rows without any data lose or reduplicate, just the some number as mysql changed . So, i don't think cdc is the problem. Stage-2: when i got binlog data, first i deserialized it to type of Tuple2, which tuple2.f0 has the format "db.table" and i intend to use it as kafka topic for every different db.table, tuple2.f1 contains binlog value only. *Stage-3:* then, i used KafkaSink (which was introduced in flink 1.14) to write binlog to different kafka topic as tuple2.f0 indicated. Here is the code like : !image-2022-06-23-10-49-01-213.png! As u can see, I just want to use EXACTLY_ONCE semantics,but here is the problem: after about 10mins waiting for all binlog consumed, i checked all data in a single kafka topic (just one topic ), the total number of rows is bigger than the number of binlog rows from mysql data changed, because too many reduplicated data sink to kafka. For example !image-2022-06-23-10-58-15-141.png! Stage-4: however, when i changed EXACTLY_ONCE. to. AT_LEAST_ONCE, everything worked very well, no more reduplicated data in kafka. So i'm wonderring , is there any bug in KafkaSink when EXACTLY_ONCE is configured. was: I'm trying to read mysql binlog and transport it to kafka; here is what i'm using : *Flink: 1.14.4* *Flink-CDC : 2.2* *Kafka: CDH6.2(2.1)* *Stage-1:* mysql-cdc-connector was used to consume mysql binlog data . about 40W rows changed when i executed some sql in mysql, and i can get those 40W rows without any data lose or reduplicate, just the some number as mysql changed . So, i don't think cdc is the problem. Stage-2: when i got binlog data, first i deserialized it to type of Tuple2, which tuple2.f0 has the format "db.table" and i intend to use it as kafka topic for every different db.table, tuple2.f1 contains binlog value only. *Stage-3:* then, i used KafkaSink (which was introduced in flink 1.14) to write binlog to different kafka topic as tuple2.f0 indicated. Here is the code like : !image-2022-06-23-10-49-01-213.png! As u can see, I just want to use EXACTLY_ONCE semantics,but here is the problem: after about 10mins waiting for all binlog consumed, i checked all data in a single kafka topic (just one topic ), the total number of rows is much more than the number of binlog rows from mysql data changed, because too many reduplicated data sink to kafka. For example !image-2022-06-23-10-58-15-141.png! Stage-4: however, when i changed EXACTLY_ONCE. to. AT_LEAST_ONCE, everything worked very well, no more reduplicated data in kafka. So i'm wonderring , is there any bug in KafkaSink when EXACTLY_ONCE is configured. > KafkaSink with EXACTLY_ONCE produce reduplicate data(flink kafka > connector1.14.4) > -- > > Key: FLINK-28209 > URL: https://issues.apache.org/jira/browse/FLINK-28209 > Project: Flink > Issue Type: Bug >Affects Versions: 1.14.4 >Reporter: tanyao >Priority: Major > Attachments: image-2022-06-23-10-49-01-213.png, > image-2022-06-23-10-58-15-141.png > > > I'm trying to read mysql binlog and transport it to kafka; > here is what i'm using : > *Flink: 1.14.4* > *Flink-CDC : 2.2* > *Kafka: CDH6.2(2.1)* > > *Stage-1:* > mysql-cdc-connector was used to consume mysql binlog data . about 40W rows > changed when i executed some sql in mysql, and i can get those 40W rows > without any data lose or reduplicate, just the some number as mysql changed . > So, i don't think cdc is the problem. > > Stage-2: > when i got binlog data, first i deserialized it to type of > Tuple2, which tuple2.f0 has the format "db.table" and i > intend to use it as kafka topic for every different db.table, tuple2.f1 > contains binlog value only. > > *Stage-3:* > then, i used KafkaSink (which was introduced in flink 1.14) to write binlog > to different kafka topic as tuple2.f0 indicated. > Here is the code like : > !image-2022-06-23-10-49-01-213.png! > > As u can see, I just want to use EXACTLY_ONCE semantics,but here is the > problem: > after about 10mins waiting for all binlog consumed, i checked all data in a > single kafka topic (just one topic ), the total number of rows is bigger > than the number of binlog rows from mysql data changed, because too many > reduplicated data sink to kafka. For example >
[jira] [Created] (FLINK-28209) KafkaSink with EXACTLY_ONCE produce reduplicate data(flink kafka connector1.14.4)
tanyao created FLINK-28209: -- Summary: KafkaSink with EXACTLY_ONCE produce reduplicate data(flink kafka connector1.14.4) Key: FLINK-28209 URL: https://issues.apache.org/jira/browse/FLINK-28209 Project: Flink Issue Type: Bug Affects Versions: 1.14.4 Reporter: tanyao Attachments: image-2022-06-23-10-49-01-213.png, image-2022-06-23-10-58-15-141.png I'm trying to read mysql binlog and transport it to kafka; here is what i'm using : *Flink: 1.14.4* *Flink-CDC : 2.2* *Kafka: CDH6.2(2.1)* *Stage-1:* mysql-cdc-connector was used to consume mysql binlog data . about 40W rows changed when i executed some sql in mysql, and i can get those 40W rows without any data lose or reduplicate, just the some number as mysql changed . So, i don't think cdc is the problem. Stage-2: when i got binlog data, first i deserialized it to type of Tuple2, which tuple2.f0 has the format "db.table" and i intend to use it as kafka topic for every different db.table, tuple2.f1 contains binlog value only. *Stage-3:* then, i used KafkaSink (which was introduced in flink 1.14) to write binlog to different kafka topic as tuple2.f0 indicated. Here is the code like : !image-2022-06-23-10-49-01-213.png! As u can see, I just want to use EXACTLY_ONCE semantics,but here is the problem: after about 10mins waiting for all binlog consumed, i checked all data in a single kafka topic (just one topic ), the total number of rows is much more than the number of binlog rows from mysql data changed, because too many reduplicated data sink to kafka. For example !image-2022-06-23-10-58-15-141.png! Stage-4: however, when i changed EXACTLY_ONCE. to. AT_LEAST_ONCE, everything worked very well, no more reduplicated data in kafka. So i'm wonderring , is there any bug in KafkaSink when EXACTLY_ONCE is configured. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] PatrickRen closed pull request #17804: [FLINK-24376][runtime] Use operator name for constructing OperatorCoordinatorProvider instead of chained name
PatrickRen closed pull request #17804: [FLINK-24376][runtime] Use operator name for constructing OperatorCoordinatorProvider instead of chained name URL: https://github.com/apache/flink/pull/17804 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] PatrickRen merged pull request #19904: [FLINK-27865][docs] Add example for configuring SASL and SSL in Kafka DataStream and SQL connector
PatrickRen merged PR #19904: URL: https://github.com/apache/flink/pull/19904 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28187) Duplicate job submission for FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-28187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557782#comment-17557782 ] Aitozi commented on FLINK-28187: [~gyfora] I add one comment here: [https://github.com/apache/flink-kubernetes-operator/commit/ab59d6eb980512775590d0d01e697fe0c28d1b3b#r76767242] > Duplicate job submission for FlinkSessionJob > > > Key: FLINK-28187 > URL: https://issues.apache.org/jira/browse/FLINK-28187 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: Jeesmon Jacob >Priority: Critical > Attachments: flink-operator-log.txt > > > During a session job submission if a deployment error (ex: > concurrent.TimeoutException) is hit, operator will submit the job again. But > first submission could have succeeded in jobManager side and second > submission could result in duplicate job. Operator log attached. > Per [~gyfora]: > The problem is that in case a deployment error was hit, the > SessionJobObserver will not be able to tell whether it has submitted the job > or not. So it will simply try to submit it again. We have to find a mechanism > to correlate Jobs on the cluster with the SessionJob CR itself. Maybe we > could override the job name itself for this purpose or something like that. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28205) memory leak in the timing flush of the jdbc-connector
[ https://issues.apache.org/jira/browse/FLINK-28205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] michaelxiang updated FLINK-28205: - Summary: memory leak in the timing flush of the jdbc-connector (was: memory leak in the timing refresh of the jdbc-connector) > memory leak in the timing flush of the jdbc-connector > - > > Key: FLINK-28205 > URL: https://issues.apache.org/jira/browse/FLINK-28205 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.15.0, 1.13.6, 1.14.5 >Reporter: michaelxiang >Priority: Major > > Bug position: > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.scheduler > When writing with the jdbc-connector, the RuntimeException thrown by the > scheduled thread to process the flush record is caught, this will cause the > flink task to not fail out until new data arrives. So, during this time, the > scheduled thread will continue to wrap the previous flushException by > creating a RuntimeException. For each flushException, the object reference > cannot be released and cannot be reclaimed by the GC, resulting in a memory > leak. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Reopened] (FLINK-28205) memory leak in the timing refresh of the jdbc-connector
[ https://issues.apache.org/jira/browse/FLINK-28205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] michaelxiang reopened FLINK-28205: -- I changed the language > memory leak in the timing refresh of the jdbc-connector > --- > > Key: FLINK-28205 > URL: https://issues.apache.org/jira/browse/FLINK-28205 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.15.0, 1.13.6, 1.14.5 >Reporter: michaelxiang >Priority: Major > > Bug position: > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.scheduler > When writing with the jdbc-connector, the RuntimeException thrown by the > scheduled thread to process the flush record is caught, this will cause the > flink task to not fail out until new data arrives. So, during this time, the > scheduled thread will continue to wrap the previous flushException by > creating a RuntimeException. For each flushException, the object reference > cannot be released and cannot be reclaimed by the GC, resulting in a memory > leak. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-ml] lindong28 commented on a diff in pull request #114: [FLINK-27096] Optimize VectorAssembler performance
lindong28 commented on code in PR #114: URL: https://github.com/apache/flink-ml/pull/114#discussion_r904462286 ## flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java: ## @@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) { return new Table[] {outputTable}; } -private static class AssemblerFunc implements FlatMapFunction { +private static class AssemblerFunc extends RichFlatMapFunction { private final String[] inputCols; private final String handleInvalid; +/** The indices for assembling vectors. */ +private transient IntArrayList indices; +/** The values for assembling vectors. */ +private transient DoubleArrayList values; + public AssemblerFunc(String[] inputCols, String handleInvalid) { this.inputCols = inputCols; this.handleInvalid = handleInvalid; } @Override -public void flatMap(Row value, Collector out) throws Exception { +public void open(Configuration parameters) throws Exception { +super.open(parameters); +indices = new IntArrayList(); +values = new DoubleArrayList(); +} + +@Override +public void flatMap(Row value, Collector out) { +int offset = 0; try { -Object[] objects = new Object[inputCols.length]; -for (int i = 0; i < objects.length; ++i) { -objects[i] = value.getField(inputCols[i]); +for (String inputCol : inputCols) { +Object object = value.getField(inputCol); +Preconditions.checkNotNull(object, "Input column value should not be null."); +if (object instanceof Number) { +indices.add(offset++); +values.add(((Number) object).doubleValue()); +} else if (object instanceof SparseVector) { +SparseVector sparseVector = (SparseVector) object; +for (int i = 0; i < sparseVector.indices.length; ++i) { +indices.add(sparseVector.indices[i] + offset); +values.add(sparseVector.values[i]); +} +offset += sparseVector.size(); +} else if (object instanceof DenseVector) { +DenseVector denseVector = (DenseVector) object; +for (int i = 0; i < denseVector.size(); ++i) { +indices.add(offset + i); +values.add(denseVector.values[i]); +} +offset += denseVector.size(); +} else { +throw new IllegalArgumentException( +"Input type has not been supported yet."); +} +} + +Vector assembledVec = +new SparseVector( Review Comment: If the performance of using `it.unimi.dsi.fastutil.*` is not considerably faster than using a for loop to construct int[] and double[], it seems simpler to still use a for-loop instead of introducing an extra library dependency. ## flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java: ## @@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) { return new Table[] {outputTable}; } -private static class AssemblerFunc implements FlatMapFunction { +private static class AssemblerFunc extends RichFlatMapFunction { private final String[] inputCols; private final String handleInvalid; +/** The indices for assembling vectors. */ +private transient IntArrayList indices; +/** The values for assembling vectors. */ +private transient DoubleArrayList values; Review Comment: It seems simpler to instantiate indices and values as local variables in the `flatMap()` method. Does re-creating these two variables for each call in the flapMap() have non-trivial performance impact? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28205) memory leak in the timing refresh of the jdbc-connector
[ https://issues.apache.org/jira/browse/FLINK-28205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] michaelxiang updated FLINK-28205: - Description: Bug position: org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.scheduler When writing with the jdbc-connector, the RuntimeException thrown by the scheduled thread to process the flush record is caught, this will cause the flink task to not fail out until new data arrives. So, during this time, the scheduled thread will continue to wrap the previous flushException by creating a RuntimeException. For each flushException, the object reference cannot be released and cannot be reclaimed by the GC, resulting in a memory leak. was: 类路径:org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat bug位置:open方法,调度线程Runnable 实例 采用flink-connector-jdbc 进行写入时, 定时调度线程进行 flush 出现异常情况时对 RuntimeException 进行了捕获,这会导致在新数据到达 Task 前不会发生故障退出,因而定时调度线程则会不停的通过创建RuntimeException 进行包裹 上一个创建的 flushException,对于flushException 无法释放引用被GC回收,从而导致内存泄漏。 Summary: memory leak in the timing refresh of the jdbc-connector (was: jdbc connector 定时调度 flush 存在内存泄漏 bug) > memory leak in the timing refresh of the jdbc-connector > --- > > Key: FLINK-28205 > URL: https://issues.apache.org/jira/browse/FLINK-28205 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.15.0, 1.13.6, 1.14.5 >Reporter: michaelxiang >Priority: Major > > Bug position: > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.scheduler > When writing with the jdbc-connector, the RuntimeException thrown by the > scheduled thread to process the flush record is caught, this will cause the > flink task to not fail out until new data arrives. So, during this time, the > scheduled thread will continue to wrap the previous flushException by > creating a RuntimeException. For each flushException, the object reference > cannot be released and cannot be reclaimed by the GC, resulting in a memory > leak. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-28129) Add documentation for rewrite data layout after scaling bucket number
[ https://issues.apache.org/jira/browse/FLINK-28129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-28129. Resolution: Fixed master: df78da0e428467af3153984199300f049c22878f > Add documentation for rewrite data layout after scaling bucket number > - > > Key: FLINK-28129 > URL: https://issues.apache.org/jira/browse/FLINK-28129 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.2.0 >Reporter: Jane Chan >Assignee: Jane Chan >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.2.0 > > > We should add a thorough doc on > * How to rescale data layout after changing bucket number. > * The current limitation on rescaling partitions for the log system. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink-table-store] JingsongLi merged pull request #167: [FLINK-28129] Add documentation for rescale bucket
JingsongLi merged PR #167: URL: https://github.com/apache/flink-table-store/pull/167 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on pull request #20028: [hotfix][docs] Remove redundant "." from sample code
Myasuka commented on PR #20028: URL: https://github.com/apache/flink/pull/20028#issuecomment-1163855718 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25100) RMQSourceITCase failed on azure due to java.io.EOFException
[ https://issues.apache.org/jira/browse/FLINK-25100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557768#comment-17557768 ] Huang Xingbo commented on FLINK-25100: -- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37080=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d > RMQSourceITCase failed on azure due to java.io.EOFException > --- > > Key: FLINK-25100 > URL: https://issues.apache.org/jira/browse/FLINK-25100 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ >Affects Versions: 1.15.0, 1.14.1, 1.13.6, 1.14.3 >Reporter: Yun Gao >Priority: Major > Labels: auto-deprioritized-major, test-stability > > {code:java} > Nov 29 12:02:05 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 55.545 s <<< FAILURE! - in > org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase > Nov 29 12:02:05 [ERROR] testStopWithSavepoint Time elapsed: 15.014 s <<< > ERROR! > Nov 29 12:02:05 com.rabbitmq.client.PossibleAuthenticationFailureException: > Possibly caused by authentication failure > Nov 29 12:02:05 at > com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:388) > Nov 29 12:02:05 at > com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:64) > Nov 29 12:02:05 at > com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156) > Nov 29 12:02:05 at > com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1130) > Nov 29 12:02:05 at > com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1087) > Nov 29 12:02:05 at > com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1045) > Nov 29 12:02:05 at > com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1207) > Nov 29 12:02:05 at > org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase.getRMQConnection(RMQSourceITCase.java:201) > Nov 29 12:02:05 at > org.apache.flink.streaming.connectors.rabbitmq.RMQSourceITCase.setUp(RMQSourceITCase.java:96) > Nov 29 12:02:05 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Nov 29 12:02:05 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Nov 29 12:02:05 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Nov 29 12:02:05 at java.lang.reflect.Method.invoke(Method.java:498) > Nov 29 12:02:05 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Nov 29 12:02:05 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Nov 29 12:02:05 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Nov 29 12:02:05 at > org.junit.internal.runners.statements.RunBefores.invokeMethod(RunBefores.java:33) > Nov 29 12:02:05 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) > Nov 29 12:02:05 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > Nov 29 12:02:05 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > Nov 29 12:02:05 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Nov 29 12:02:05 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > Nov 29 12:02:05 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > Nov 29 12:02:05 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > Nov 29 12:02:05 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > Nov 29 12:02:05 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > Nov 29 12:02:05 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > Nov 29 12:02:05 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > Nov 29 12:02:05 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > Nov 29 12:02:05 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > Nov 29 12:02:05 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30) > Nov 29 12:02:05 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > Nov 29 12:02:05 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Nov 29 12:02:05 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > {code} >
[jira] [Commented] (FLINK-28173) Multiple Parquet format tests are failing with NoSuchMethodError
[ https://issues.apache.org/jira/browse/FLINK-28173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557764#comment-17557764 ] Huang Xingbo commented on FLINK-28173: -- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37079=logs=7e3d33c3-a462-5ea8-98b8-27e1aafe4ceb=ef77f8d1-44c8-5ee2-f175-1c88f61de8c0=16746 > Multiple Parquet format tests are failing with NoSuchMethodError > > > Key: FLINK-28173 > URL: https://issues.apache.org/jira/browse/FLINK-28173 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.16.0 >Reporter: Martijn Visser >Assignee: jia liu >Priority: Critical > Labels: pull-request-available, test-stability > > {code:java} > Jun 21 02:44:38 java.lang.NoSuchMethodError: > com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V > Jun 21 02:44:38 at > org.apache.hadoop.conf.Configuration.set(Configuration.java:1357) > Jun 21 02:44:38 at > org.apache.hadoop.conf.Configuration.set(Configuration.java:1338) > Jun 21 02:44:38 at > org.apache.hadoop.conf.Configuration.readFields(Configuration.java:3798) > Jun 21 02:44:38 at > org.apache.flink.formats.parquet.utils.SerializableConfiguration.readObject(SerializableConfiguration.java:50) > Jun 21 02:44:38 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Jun 21 02:44:38 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > {code} > {code:java} > Jun 21 02:44:42 [ERROR] Run 1: > com.google.common.base.Preconditions.checkState(ZLjava/lang/String;I)V > Jun 21 02:44:42 [ERROR] Run 2: > com.google.common.base.Preconditions.checkState(ZLjava/lang/String;I)V > Jun 21 02:44:42 [INFO] > Jun 21 02:44:42 [ERROR] ParquetColumnarRowSplitReaderTest.testProject > Jun 21 02:44:42 [ERROR] Run 1: > com.google.common.base.Preconditions.checkState(ZLjava/lang/String;I)V > Jun 21 02:44:42 [ERROR] Run 2: > com.google.common.base.Preconditions.checkState(ZLjava/lang/String;I)V > Jun 21 02:44:42 [INFO] > Jun 21 02:44:42 [ERROR] ParquetColumnarRowSplitReaderTest.testReachEnd > Jun 21 02:44:42 [ERROR] Run 1: > com.google.common.base.Preconditions.checkState(ZLjava/lang/String;I)V > Jun 21 02:44:42 [ERROR] Run 2: > com.google.common.base.Preconditions.checkState(ZLjava/lang/String;I)V > Jun 21 02:44:42 [INFO] > Jun 21 02:44:42 [ERROR] > AvroParquetRecordFormatTest.testCreateGenericReader:161->createReader:269 » > NoSuchMethod > Jun 21 02:44:42 [ERROR] > AvroParquetRecordFormatTest.testCreateReflectReader:133->createReader:269 » > NoSuchMethod > Jun 21 02:44:42 [ERROR] > AvroParquetRecordFormatTest.testCreateSpecificReader:118->createReader:269 » > NoSuchMethod > Jun 21 02:44:42 [ERROR] > AvroParquetRecordFormatTest.testReadWithRestoreGenericReader:203->restoreReader:293 > » NoSuchMethod > Jun 21 02:44:42 [ERROR] > AvroParquetRecordFormatTest.testReflectReadFromGenericRecords:147->createReader:269 > » NoSuchMethod > Jun 21 02:44:42 [ERROR] ParquetRowDataWriterTest.testCompression:126 » > NoSuchMethod com.google.common > Jun 21 02:44:42 [ERROR] > ParquetRowDataWriterTest.testTypes:117->innerTest:168 » NoSuchMethod > com.googl... > Jun 21 02:44:42 [ERROR] SerializableConfigurationTest.testResource:45 » > NoSuchMethod com.google.common... > Jun 21 02:44:42 [INFO] > Jun 21 02:44:42 [ERROR] Tests run: 31, Failures: 0, Errors: 24, Skipped: 0 > Jun 21 02:44:42 [INFO] > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36979=logs=7e3d33c3-a462-5ea8-98b8-27e1aafe4ceb=ef77f8d1-44c8-5ee2-f175-1c88f61de8c0=16375 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28207) Disabling webhook should also disable mutator
[ https://issues.apache.org/jira/browse/FLINK-28207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557763#comment-17557763 ] Aitozi commented on FLINK-28207: The option is introduced to let user can choose to enable/disable the validator and mutator individually. But, for compatibility, the validator option keep with the {{webhook.create}}, It should be called {{webhook.validator.create}} better, I think. The context is [here|https://github.com/apache/flink-kubernetes-operator/pull/265#discussion_r895715818] One another solution: we can add the {{webhook.validator.create}} and let the {{webhook.create}} control both, WDYT ? > Disabling webhook should also disable mutator > - > > Key: FLINK-28207 > URL: https://issues.apache.org/jira/browse/FLINK-28207 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Márton Balassi >Assignee: Márton Balassi >Priority: Major > Fix For: kubernetes-operator-1.1.0 > > > The configuration for the mutating webhook suggests that it is nested inside > the (validating) webhook: > https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/values.yaml#L73-L76 > Based on this I would expect that if I disable the top level webhook it also > disables the mutator, however this is not the case: > https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/webhook.yaml#L19-L79 > https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/webhook.yaml#L115-L148 > I do not see a use case currently where we would want the mutating webhook > without having the validating one, so I suggest following the hierarchy that > the helm configs imply. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28206) EOFException on Checkpoint Recovery
[ https://issues.apache.org/jira/browse/FLINK-28206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557762#comment-17557762 ] Chenya Zhang commented on FLINK-28206: -- +1 that we are seeing similar exceptions today when running with Flink 1.14 to read/deserialize from checkpoint states: {code:java} org.apache.flink.util.FlinkRuntimeException: Unexpected list element deserialization failure at org.apache.flink.runtime.state.ListDelimitedSerializer.deserializeNextElement(ListDelimitedSerializer.java:89) at org.apache.flink.runtime.state.ListDelimitedSerializer.deserializeList(ListDelimitedSerializer.java:51) at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:120) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61) at org.apache.flink.runtime.state.metrics.LatencyTrackingListState.get(LatencyTrackingListState.java:63) at org.apache.flink.runtime.state.metrics.LatencyTrackingListState.get(LatencyTrackingListState.java:34) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:475) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:603) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239) at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200) at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readFully(DataInputDeserializer.java:172) at org.apache.flink.formats.avro.utils.DataInputDecoder.readBytes(DataInputDecoder.java:95) at org.apache.avro.io.ResolvingDecoder.readBytes(ResolvingDecoder.java:243) at org.apache.avro.generic.GenericDatumReader.readBytes(GenericDatumReader.java:543) at org.apache.avro.generic.GenericDatumReader.readBytes(GenericDatumReader.java:534) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:193) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at ... 25 more{code} > EOFException on Checkpoint Recovery > --- > > Key: FLINK-28206 > URL:
[GitHub] [flink-web] HuangXingBo commented on a diff in pull request #554: Release flink 1.15.1
HuangXingBo commented on code in PR #554: URL: https://github.com/apache/flink-web/pull/554#discussion_r904450779 ## _posts/2022-06-29-release-1.15.1.md: ## @@ -0,0 +1,187 @@ +--- +layout: post +title: "Apache Flink 1.15.1 Release Announcement" +date: 2022-06-29T08:00:00.000Z +categories: news +authors: +- knaufk: + name: "David Anderson" + twitter: "alpinegizmo" + +excerpt: The Apache Flink Community is please to announce a bug fix release for Flink 1.15. Review Comment: ```suggestion excerpt: The Apache Flink Community is pleased to announce a bug fix release for Flink 1.15. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] (FLINK-28198) CassandraConnectorITCase#testRaiseCassandraRequestsTimeouts fails with timeout
[ https://issues.apache.org/jira/browse/FLINK-28198 ] Huang Xingbo deleted comment on FLINK-28198: -- was (Author: hxbks2ks): https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37037=logs=4eda0b4a-bd0d-521a-0916-8285b9be9bb5=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9 > CassandraConnectorITCase#testRaiseCassandraRequestsTimeouts fails with timeout > -- > > Key: FLINK-28198 > URL: https://issues.apache.org/jira/browse/FLINK-28198 > Project: Flink > Issue Type: Bug > Components: Connectors / Cassandra >Affects Versions: 1.16.0 >Reporter: Martijn Visser >Priority: Critical > Labels: test-stability > > {code:java} > Jun 22 07:57:37 [ERROR] > org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRaiseCassandraRequestsTimeouts > Time elapsed: 12.067 s <<< ERROR! > Jun 22 07:57:37 > com.datastax.driver.core.exceptions.OperationTimedOutException: > [/172.17.0.1:59915] Timed out waiting for server response > Jun 22 07:57:37 at > com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:43) > Jun 22 07:57:37 at > com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:25) > Jun 22 07:57:37 at > com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:35) > Jun 22 07:57:37 at > com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:293) > Jun 22 07:57:37 at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:58) > Jun 22 07:57:37 at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37037=logs=4eda0b4a-bd0d-521a-0916-8285b9be9bb5=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9=13736 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28198) CassandraConnectorITCase#testRaiseCassandraRequestsTimeouts fails with timeout
[ https://issues.apache.org/jira/browse/FLINK-28198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557761#comment-17557761 ] Huang Xingbo commented on FLINK-28198: -- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37037=logs=4eda0b4a-bd0d-521a-0916-8285b9be9bb5=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9 > CassandraConnectorITCase#testRaiseCassandraRequestsTimeouts fails with timeout > -- > > Key: FLINK-28198 > URL: https://issues.apache.org/jira/browse/FLINK-28198 > Project: Flink > Issue Type: Bug > Components: Connectors / Cassandra >Affects Versions: 1.16.0 >Reporter: Martijn Visser >Priority: Major > Labels: test-stability > > {code:java} > Jun 22 07:57:37 [ERROR] > org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRaiseCassandraRequestsTimeouts > Time elapsed: 12.067 s <<< ERROR! > Jun 22 07:57:37 > com.datastax.driver.core.exceptions.OperationTimedOutException: > [/172.17.0.1:59915] Timed out waiting for server response > Jun 22 07:57:37 at > com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:43) > Jun 22 07:57:37 at > com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:25) > Jun 22 07:57:37 at > com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:35) > Jun 22 07:57:37 at > com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:293) > Jun 22 07:57:37 at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:58) > Jun 22 07:57:37 at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37037=logs=4eda0b4a-bd0d-521a-0916-8285b9be9bb5=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9=13736 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28198) CassandraConnectorITCase#testRaiseCassandraRequestsTimeouts fails with timeout
[ https://issues.apache.org/jira/browse/FLINK-28198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo updated FLINK-28198: - Priority: Critical (was: Major) > CassandraConnectorITCase#testRaiseCassandraRequestsTimeouts fails with timeout > -- > > Key: FLINK-28198 > URL: https://issues.apache.org/jira/browse/FLINK-28198 > Project: Flink > Issue Type: Bug > Components: Connectors / Cassandra >Affects Versions: 1.16.0 >Reporter: Martijn Visser >Priority: Critical > Labels: test-stability > > {code:java} > Jun 22 07:57:37 [ERROR] > org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRaiseCassandraRequestsTimeouts > Time elapsed: 12.067 s <<< ERROR! > Jun 22 07:57:37 > com.datastax.driver.core.exceptions.OperationTimedOutException: > [/172.17.0.1:59915] Timed out waiting for server response > Jun 22 07:57:37 at > com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:43) > Jun 22 07:57:37 at > com.datastax.driver.core.exceptions.OperationTimedOutException.copy(OperationTimedOutException.java:25) > Jun 22 07:57:37 at > com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:35) > Jun 22 07:57:37 at > com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:293) > Jun 22 07:57:37 at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:58) > Jun 22 07:57:37 at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37037=logs=4eda0b4a-bd0d-521a-0916-8285b9be9bb5=2ff6d5fa-53a6-53ac-bff7-fa524ea361a9=13736 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (FLINK-26891) Emit events for important Deployment / Job changes
[ https://issues.apache.org/jira/browse/FLINK-26891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise reassigned FLINK-26891: Assignee: Thomas Weise (was: Matyas Orhidi) > Emit events for important Deployment / Job changes > -- > > Key: FLINK-26891 > URL: https://issues.apache.org/jira/browse/FLINK-26891 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Thomas Weise >Priority: Major > Fix For: kubernetes-operator-1.1.0 > > > We should try capturing the important deployment states, such as RUNNING, > FAILING, DEPLOYING -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-27766) Introduce the framework of the SqlGatewayService
[ https://issues.apache.org/jira/browse/FLINK-27766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang closed FLINK-27766. - Fix Version/s: 1.16.0 Resolution: Fixed > Introduce the framework of the SqlGatewayService > > > Key: FLINK-27766 > URL: https://issues.apache.org/jira/browse/FLINK-27766 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Gateway >Reporter: Shengkai Fang >Assignee: Shengkai Fang >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27766) Introduce the framework of the SqlGatewayService
[ https://issues.apache.org/jira/browse/FLINK-27766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557757#comment-17557757 ] Shengkai Fang commented on FLINK-27766: --- master:da08267c2cb29d98e626a867f8e07cb5b8e7f29a master:626bdacdbe303d9e9c346e7ba7f3d8c7b42b1d8e > Introduce the framework of the SqlGatewayService > > > Key: FLINK-27766 > URL: https://issues.apache.org/jira/browse/FLINK-27766 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Gateway >Reporter: Shengkai Fang >Assignee: Shengkai Fang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] fsk119 merged pull request #19823: [FLINK-27766][sql-gateway] Introduce the framework of the SqlGatewayService
fsk119 merged PR #19823: URL: https://github.com/apache/flink/pull/19823 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28208) The method createBatchSink in class HiveTableSink should setParallelism for map operator
[ https://issues.apache.org/jira/browse/FLINK-28208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liu updated FLINK-28208: Description: The problem is found when using Adaptive Batch Scheduler. In these, a simple SQL like "select * from * where *" would generate three operators including source, map and sink. The map's parallelism is set to -1 by default and is not the same with source and sink. As a result, the three operators can not be chained together. The reason is that we add map operator in method createBatchSink but not setParallelism. The changed code is as following: {code:java} private DataStreamSink createBatchSink( DataStream dataStream, DataStructureConverter converter, StorageDescriptor sd, HiveWriterFactory recordWriterFactory, OutputFileConfig fileNaming, final int parallelism) throws IOException { ... return dataStream .map((MapFunction) value -> (Row) converter.toExternal(value)) .setParallelism(parallelism) // New added to ensure the right parallelism .writeUsingOutputFormat(builder.build()) .setParallelism(parallelism); } {code} was: The problem is found when using Adaptive Batch Scheduler. In these, a simple SQL like "select * from * where *" would generate three operators including source, map and sink. The map's parallelism is set to -1 by default and is not the same with source and sink. As a result, the three operators can not be chained together. The reason is that we add map operator in method createBatchSink but not setParallelism. The changed code is as following: {code:java} private DataStreamSink createBatchSink( DataStream dataStream, DataStructureConverter converter, StorageDescriptor sd, HiveWriterFactory recordWriterFactory, OutputFileConfig fileNaming, final int parallelism) throws IOException { ... return dataStream .map((MapFunction) value -> (Row) converter.toExternal(value)) .setParallelism(parallelism) // New added to ensure the right parallelism .writeUsingOutputFormat(builder.build()) .setParallelism(parallelism); } {code} > The method createBatchSink in class HiveTableSink should setParallelism for > map operator > > > Key: FLINK-28208 > URL: https://issues.apache.org/jira/browse/FLINK-28208 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Liu >Priority: Major > > The problem is found when using Adaptive Batch Scheduler. In these, a simple > SQL like "select * from * where *" would generate three operators including > source, map and sink. The map's parallelism is set to -1 by default and is > not the same with source and sink. As a result, the three operators can not > be chained together. > The reason is that we add map operator in method createBatchSink but not > setParallelism. The changed code is as following: > {code:java} > private DataStreamSink createBatchSink( > DataStream dataStream, > DataStructureConverter converter, > StorageDescriptor sd, > HiveWriterFactory recordWriterFactory, > OutputFileConfig fileNaming, > final int parallelism) > throws IOException { > ... > return dataStream > .map((MapFunction) value -> (Row) > converter.toExternal(value)) > .setParallelism(parallelism) // New added to ensure the right > parallelism .writeUsingOutputFormat(builder.build()) > .setParallelism(parallelism); > } {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28208) The method createBatchSink in class HiveTableSink should setParallelism for map operator
[ https://issues.apache.org/jira/browse/FLINK-28208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liu updated FLINK-28208: Description: The problem is found when using Adaptive Batch Scheduler. In these, a simple SQL like "select * from * where *" would generate three operators including source, map and sink. The map's parallelism is set to -1 by default and is not the same with source and sink. As a result, the three operators can not be chained together. The reason is that we add map operator in method createBatchSink but not setParallelism. The changed code is as following: {code:java} private DataStreamSink createBatchSink( DataStream dataStream, DataStructureConverter converter, StorageDescriptor sd, HiveWriterFactory recordWriterFactory, OutputFileConfig fileNaming, final int parallelism) throws IOException { ... return dataStream .map((MapFunction) value -> (Row) converter.toExternal(value)) .setParallelism(parallelism) // New added to ensure the right parallelism .writeUsingOutputFormat(builder.build()) .setParallelism(parallelism); } {code} was: The problem is found when using Adaptive Batch Scheduler. In these, a simple SQL like "select * from * where *" would generate three operators including source, map and sink. The map's parallelism is set to -1 by default and is not the same with source and sink. As a result, the three operators can not be chained together. The reason is that we add map operator in method createBatchSink but not setParallelism. The changed code is as following: {code:java} private DataStreamSink createBatchSink( DataStream dataStream, DataStructureConverter converter, StorageDescriptor sd, HiveWriterFactory recordWriterFactory, OutputFileConfig fileNaming, final int parallelism) throws IOException { ... return dataStream .map((MapFunction) value -> (Row) converter.toExternal(value)) .setParallelism(parallelism) // New added to ensure the right parallelism .writeUsingOutputFormat(builder.build()) .setParallelism(parallelism); } {code} > The method createBatchSink in class HiveTableSink should setParallelism for > map operator > > > Key: FLINK-28208 > URL: https://issues.apache.org/jira/browse/FLINK-28208 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Liu >Priority: Major > > The problem is found when using Adaptive Batch Scheduler. In these, a simple > SQL like "select * from * where *" would generate three operators including > source, map and sink. The map's parallelism is set to -1 by default and is > not the same with source and sink. As a result, the three operators can not > be chained together. > The reason is that we add map operator in method createBatchSink but not > setParallelism. The changed code is as following: > {code:java} > private DataStreamSink createBatchSink( > DataStream dataStream, > DataStructureConverter converter, > StorageDescriptor sd, > HiveWriterFactory recordWriterFactory, > OutputFileConfig fileNaming, > final int parallelism) > throws IOException { > ... > return dataStream > .map((MapFunction) value -> (Row) > converter.toExternal(value)) > .setParallelism(parallelism) // New added to ensure the right > parallelism .writeUsingOutputFormat(builder.build()) > .setParallelism(parallelism); > } {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28208) The method createBatchSink in class HiveTableSink should setParallelism for map operator
Liu created FLINK-28208: --- Summary: The method createBatchSink in class HiveTableSink should setParallelism for map operator Key: FLINK-28208 URL: https://issues.apache.org/jira/browse/FLINK-28208 Project: Flink Issue Type: Improvement Components: Connectors / Hive Affects Versions: 1.16.0 Reporter: Liu The problem is found when using Adaptive Batch Scheduler. In these, a simple SQL like "select * from * where *" would generate three operators including source, map and sink. The map's parallelism is set to -1 by default and is not the same with source and sink. As a result, the three operators can not be chained together. The reason is that we add map operator in method createBatchSink but not setParallelism. The changed code is as following: {code:java} private DataStreamSink createBatchSink( DataStream dataStream, DataStructureConverter converter, StorageDescriptor sd, HiveWriterFactory recordWriterFactory, OutputFileConfig fileNaming, final int parallelism) throws IOException { ... return dataStream .map((MapFunction) value -> (Row) converter.toExternal(value)) .setParallelism(parallelism) // New added to ensure the right parallelism .writeUsingOutputFormat(builder.build()) .setParallelism(parallelism); } {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-28194) Remove workaround around avro sql jar
[ https://issues.apache.org/jira/browse/FLINK-28194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-28194. --- Resolution: Fixed Merged to master via 40efeb9314fe70bbc600a9131a7031ac193f246d > Remove workaround around avro sql jar > - > > Key: FLINK-28194 > URL: https://issues.apache.org/jira/browse/FLINK-28194 > Project: Flink > Issue Type: Technical Debt > Components: API / Python >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Because of FLINK-17417 flink-python contains a workaround that manually > assembles a sort-of avro sql jar. > Rely on the sql-avro jar instead and remove the workaround. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] dianfu merged pull request #20045: [FLINK-28194][python] Remove avro sql jar workaround
dianfu merged PR #20045: URL: https://github.com/apache/flink/pull/20045 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #114: [FLINK-27096] Optimize VectorAssembler performance
yunfengzhou-hub commented on code in PR #114: URL: https://github.com/apache/flink-ml/pull/114#discussion_r904423389 ## flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java: ## @@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) { return new Table[] {outputTable}; } -private static class AssemblerFunc implements FlatMapFunction { +private static class AssemblerFunc extends RichFlatMapFunction { private final String[] inputCols; private final String handleInvalid; +/** The indices for assembling vectors. */ +private transient IntArrayList indices; Review Comment: Why should we use `IntArrayList`, instead of `List` and `Integer[]`? Would it bring performance improvement compared with these options? How about implementing it in two loops? In the first loop, we only extract the vector/numbers, and calculate their total size. Then we allocate the integer and double arrays according to the calculated size, and assign the values in the second loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] LadyForest closed pull request #167: [FLINK-28129] Add documentation for rescale bucket
LadyForest closed pull request #167: [FLINK-28129] Add documentation for rescale bucket URL: https://github.com/apache/flink-table-store/pull/167 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27963) FlinkRuntimeException in KafkaSink causes a Flink job to hang
[ https://issues.apache.org/jira/browse/FLINK-27963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557728#comment-17557728 ] Dmytro commented on FLINK-27963: [~renqs], do know by any change when you will be able to implement this feature/fix? The exception handling in Kafka connectors is still a problem on our side. Thank you! > FlinkRuntimeException in KafkaSink causes a Flink job to hang > - > > Key: FLINK-27963 > URL: https://issues.apache.org/jira/browse/FLINK-27963 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.0, 1.14.4 >Reporter: Dmytro >Priority: Major > Labels: FlinkRuntimeException, KafkaSink > > If FlinkRuntimeException occurs in the > [KafkaSink|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#kafka-sink] > then the Flink job tries to re-send failed data again and gets into endless > loop "exception->send again" > *Code sample which throws the FlinkRuntimeException:* > {code:java} > int numberOfRows = 1; > int rowsPerSecond = 1; > DataStream stream = environment.addSource( > new DataGeneratorSource<>( > RandomGenerator.stringGenerator(105), // > max.message.bytes=1048588 > rowsPerSecond, > (long) numberOfRows), > TypeInformation.of(String.class)) > .setParallelism(1) > .name("string-generator"); > KafkaSinkBuilder builder = KafkaSink.builder() > .setBootstrapServers("localhost:9092") > .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .setRecordSerializer( > > KafkaRecordSerializationSchema.builder().setTopic("test.output") > .setValueSerializationSchema(new > SimpleStringSchema()) > .build()); > KafkaSink sink = builder.build(); > stream.sinkTo(sink).setParallelism(1).name("output-producer"); {code} > *Exception Stack Trace:* > {code:java} > 2022-06-02/14:01:45.066/PDT [flink-akka.actor.default-dispatcher-4] INFO > output-producer: Writer -> output-producer: Committer (1/1) > (a66beca5a05c1c27691f7b94ca6ac025) switched from RUNNING to FAILED on > 271b1b90-7d6b-4a34-8116-3de6faa8a9bf @ 127.0.0.1 (dataPort=-1). > org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka > null with FlinkKafkaInternalProducer{transactionalId='null', > inTransaction=false, closed=false} at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:440) > ~[flink-connector-kafka-1.15.0.jar:1.15.0] at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:421) > ~[flink-connector-kafka-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > ~[flink-streaming-java-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > ~[flink-runtime-1.15.0.jar:1.15.0] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:748) > ~[?:1.8.0_292] Caused by: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1050088 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. {code} > ** -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28207) Disabling webhook should also disable mutator
[ https://issues.apache.org/jira/browse/FLINK-28207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557677#comment-17557677 ] Márton Balassi commented on FLINK-28207: [~aitozi] do you have any concerns? > Disabling webhook should also disable mutator > - > > Key: FLINK-28207 > URL: https://issues.apache.org/jira/browse/FLINK-28207 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Márton Balassi >Assignee: Márton Balassi >Priority: Major > Fix For: kubernetes-operator-1.1.0 > > > The configuration for the mutating webhook suggests that it is nested inside > the (validating) webhook: > https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/values.yaml#L73-L76 > Based on this I would expect that if I disable the top level webhook it also > disables the mutator, however this is not the case: > https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/webhook.yaml#L19-L79 > https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/webhook.yaml#L115-L148 > I do not see a use case currently where we would want the mutating webhook > without having the validating one, so I suggest following the hierarchy that > the helm configs imply. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28207) Disabling webhook should also disable mutator
Márton Balassi created FLINK-28207: -- Summary: Disabling webhook should also disable mutator Key: FLINK-28207 URL: https://issues.apache.org/jira/browse/FLINK-28207 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Márton Balassi Assignee: Márton Balassi Fix For: kubernetes-operator-1.1.0 The configuration for the mutating webhook suggests that it is nested inside the (validating) webhook: https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/values.yaml#L73-L76 Based on this I would expect that if I disable the top level webhook it also disables the mutator, however this is not the case: https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/webhook.yaml#L19-L79 https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/webhook.yaml#L115-L148 I do not see a use case currently where we would want the mutating webhook without having the validating one, so I suggest following the hierarchy that the helm configs imply. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28204) Deleting a FlinkDeployment results in an error on the pod
[ https://issues.apache.org/jira/browse/FLINK-28204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi updated FLINK-28204: --- Fix Version/s: kubernetes-operator-1.0.1 > Deleting a FlinkDeployment results in an error on the pod > - > > Key: FLINK-28204 > URL: https://issues.apache.org/jira/browse/FLINK-28204 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 > Environment: AWS EKS > > {code:java} > kubectl version > > > Client Version: version.Info{Major:"1", Minor:"23", GitVersion:"v1.23.8", > GitCommit:"a12b886b1da059e0190c54d09c5eab5219dd7acf", GitTreeState:"clean", > BuildDate:"2022-06-17T22:27:29 > Z", GoVersion:"go1.17.10", Compiler:"gc", Platform:"linux/amd64"} > Server Version: version.Info{Major:"1", Minor:"22+", > GitVersion:"v1.22.9-eks-a64ea69", > GitCommit:"540410f9a2e24b7a2a870ebfacb3212744b5f878", GitTreeState:"clean", > BuildDate:"2022-0 > 5-12T19:15:31Z", GoVersion:"go1.16.15", Compiler:"gc", Platform:"linux/amd64"} > {code} >Reporter: Matt Casters >Priority: Critical > Fix For: kubernetes-operator-1.0.1 > > > I didn't configure the memory settings of my Flink cluster correctly in the > Flink deployment Yaml. > So I thought I would delete the deployment but I'm getting this error in the > log of the f-k-o pod: > {code:java} > 2022-06-22 13:19:13,521 o.a.f.k.o.c.FlinkDeploymentController [INFO > ][default/apache-hop-flink] Deleting FlinkDeployment > 2022-06-22 13:19:13,521 i.j.o.p.e.ReconciliationDispatcher > [ERROR][default/apache-hop-flink] Error during event processing > ExecutionScope{ resource id: CustomResourceID{name='apache-hop-flink', > namespace='default'}, version: 23709} failed. > java.lang.RuntimeException: Cannot create observe config before first > deployment, this indicates a bug. > at > org.apache.flink.kubernetes.operator.config.FlinkConfigManager.getObserveConfig(FlinkConfigManager.java:137) > at > org.apache.flink.kubernetes.operator.service.FlinkService.cancelJob(FlinkService.java:357) > at > org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.shutdown(ApplicationReconciler.java:327) > at > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler.cleanup(AbstractDeploymentReconciler.java:56) > at > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler.cleanup(AbstractDeploymentReconciler.java:37) > at > org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:107) > at > org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:59) > at > io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:68) > at > io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:50) > at > io.javaoperatorsdk.operator.api.monitoring.Metrics.timeControllerExecution(Metrics.java:34) > at > io.javaoperatorsdk.operator.processing.Controller.cleanup(Controller.java:49) > at > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleCleanup(ReconciliationDispatcher.java:252) > at > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:72) > at > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:50) > at > io.javaoperatorsdk.operator.processing.event.EventProcessor$ControllerExecution.run(EventProcessor.java:349) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.base/java.lang.Thread.run(Unknown Source) {code} > So in essence this leaves me in a state between not deployed and not able to > delete the flinkdeployment. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28206) EOFException on Checkpoint Recovery
uharaqo created FLINK-28206: --- Summary: EOFException on Checkpoint Recovery Key: FLINK-28206 URL: https://issues.apache.org/jira/browse/FLINK-28206 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.14.4 Reporter: uharaqo We have only one Job Manager in Kubernetes and it suddenly got killed without any logs. A new Job Manager process could not recover from a checkpoint due to an EOFException. Task Managers killed themselves since they could not find any Job Manager. There were no error logs other than that on the Task Manager side. It looks to me that the checkpoint is corrupted. Is there a way to identify the cause? What would you recommend us to do to mitigate this problem? Here's the logs during the recovery phase. (Removed the stacktrace. Please find that at the bottom.) {noformat} {"timestamp":"2022-06-22T17:21:25.870Z","level":"INFO","logger":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils","message":"Recovering checkpoints from KubernetesStateHandleStore{configMapName='univex-flink-record-collector-46071c6a64e47d1ce828dfe032f943a6-jobmanager-leader'}."} {"timestamp":"2022-06-22T17:21:25.875Z","level":"INFO","logger":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils","message":"Found 1 checkpoints in KubernetesStateHandleStore{configMapName='univex-flink-record-collector-46071c6a64e47d1ce828dfe032f943a6-jobmanager-leader'}."} {"timestamp":"2022-06-22T17:21:25.876Z","level":"INFO","logger":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils","message":"Trying to fetch 1 checkpoints from storage."} {"timestamp":"2022-06-22T17:21:25.876Z","level":"INFO","logger":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils","message":"Trying to retrieve checkpoint 58130."} {"timestamp":"2022-06-22T17:21:25.901Z","level":"ERROR","logger":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","message":"Fatal error occurred in the cluster entrypoint.","level":"INFO","logger":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","message":"Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.."} {"timestamp":"2022-06-22T17:21:25.921Z","level":"INFO","logger":"org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint","message":"Shutting down rest endpoint."} {"timestamp":"2022-06-22T17:21:25.922Z","level":"INFO","logger":"org.apache.flink.runtime.blob.BlobServer","message":"Stopped BLOB server at 0.0.0.0:6124"} {noformat} The stacktrace of the ERROR: {noformat} org.apache.flink.util.FlinkException: JobMaster for job 46071c6a64e47d1ce828dfe032f943a6 failed. at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:913) at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:473) at org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:450) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:427) at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at
[jira] [Commented] (FLINK-28199) Failures on YARNHighAvailabilityITCase.testClusterClientRetrieval and YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-28199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557606#comment-17557606 ] Martijn Visser commented on FLINK-28199: It was indeed only once that this happened. Do we want to keep the ticket open to see if it was just a fluke or if this is going to re-appear more frequently? > Failures on YARNHighAvailabilityITCase.testClusterClientRetrieval and > YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint > - > > Key: FLINK-28199 > URL: https://issues.apache.org/jira/browse/FLINK-28199 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.16.0 >Reporter: Martijn Visser >Priority: Major > Labels: test-stability > > {code:java} > Jun 22 08:57:50 [ERROR] Errors: > Jun 22 08:57:50 [ERROR] > YARNHighAvailabilityITCase.testClusterClientRetrieval » Timeout > testClusterCli... > Jun 22 08:57:50 [ERROR] > YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint:156->YarnTestBase.runTest:288->lambda$testKillYarnSessionClusterEntrypoint$0:182->waitForJobTermination:325 > » Execution > Jun 22 08:57:50 [INFO] > Jun 22 08:57:50 [ERROR] Tests run: 27, Failures: 0, Errors: 2, Skipped: 0 > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37037=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=29523 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Closed] (FLINK-28196) Rename hadoop.version property
[ https://issues.apache.org/jira/browse/FLINK-28196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-28196. Resolution: Fixed master: 3b50f19ad27a49c5b804e8e811cbb2062dcff003 > Rename hadoop.version property > -- > > Key: FLINK-28196 > URL: https://issues.apache.org/jira/browse/FLINK-28196 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Maven 3.8.5 had a change (as I understand it for consistency purposes) where > properties set on the command-line are also applied to upstream dependencies. > See https://issues.apache.org/jira/browse/MNG-7417 > In other words, since Hadoop has a {{hadoop.version}} property in their > parent pom, when we set this CI it not only sets _our_ property, but also the > one from Hadoop. > We should prefix our property with "flink" to prevent such conflicts. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] zentol merged pull request #20046: [FLINK-28196][build] Rename hadoop.version property
zentol merged PR #20046: URL: https://github.com/apache/flink/pull/20046 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #20046: [FLINK-28196][build] Rename hadoop.version property
zentol commented on PR #20046: URL: https://github.com/apache/flink/pull/20046#issuecomment-1163349594 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20053: [FLINK-28201][ci] Generalize utils around dependency plugin
flinkbot commented on PR #20053: URL: https://github.com/apache/flink/pull/20053#issuecomment-1163342659 ## CI report: * 50da2a67713f98edca34821b185a68254a41efb8 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a diff in pull request #20053: [FLINK-28201][ci] Generalize utils around dependency plugin
zentol commented on code in PR #20053: URL: https://github.com/apache/flink/pull/20053#discussion_r903987642 ## tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java: ## @@ -96,40 +98,35 @@ private static ParseResult parseMavenOutput(final Path path) throws IOException final Set cleanModules = new HashSet<>(); final Set infectedModules = new HashSet<>(); -try (final BufferedReader bufferedReader = -Files.newBufferedReader(path, StandardCharsets.UTF_8)) { +final Map> dependenciesByModule = +DependencyParser.parseDependencyTree(path); -String line; -while ((line = bufferedReader.readLine()) != null) { -final Matcher matcher = moduleNamePattern.matcher(line); -if (matcher.matches()) { -final String moduleName = stripScalaSuffix(matcher.group(1)); +for (String module : dependenciesByModule.keySet()) { +final String moduleName = stripScalaSuffix(module); +{ +{ Review Comment: These are only here to make the review easier (keeping the indentation levels the same), and are removed in the second commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28201) Generalize utils around dependency-plugin
[ https://issues.apache.org/jira/browse/FLINK-28201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28201: --- Labels: pull-request-available (was: ) > Generalize utils around dependency-plugin > - > > Key: FLINK-28201 > URL: https://issues.apache.org/jira/browse/FLINK-28201 > Project: Flink > Issue Type: Sub-task > Components: Build System, Build System / CI >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > We'll be adding another safeguard against developer mistakes which also > parses the output of the dependency plugin, like the scala suffix checker. > We should generalize this parsing such that both checks can use the same code. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] zentol opened a new pull request, #20053: [FLINK-28201][ci] Generalize utils around dependency plugin
zentol opened a new pull request, #20053: URL: https://github.com/apache/flink/pull/20053 This PR reworks the parsing of the output from the dependency-plugin to make it more generic and re-usable for other checks. The scala suffix checker, the only current user, was migrated as part of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode
rkhachatryan commented on PR #19448: URL: https://github.com/apache/flink/pull/19448#issuecomment-1163264334 I've compared performance results. The top regressions are: ``` org.apache.flink.benchmark.MultipleInputBenchmark.multiInputOneIdleMapSink 15.28% org.apache.flink.benchmark.InputBenchmark.mapRebalanceMapSink11.73% org.apache.flink.benchmark.MultiInputCheckpointingTimeBenchmark.checkpointMultiInput 7.01% org.apache.flink.benchmark.WindowBenchmarks.globalWindow 5.72% org.apache.flink.benchmark.MemoryStateBackendBenchmark.stateBackends 3.84% org.apache.flink.benchmark.SortingBoundedInputBenchmarks.sortedMultiInput 3.59% org.apache.flink.benchmark.MemoryStateBackendBenchmark.stateBackends 3.58% org.apache.flink.benchmark.TwoInputBenchmark.twoInputMapSink 3.42% org.apache.flink.benchmark.WindowBenchmarks.tumblingWindow 3.29% org.apache.flink.benchmark.AsyncWaitOperatorBenchmark.asyncWait 2.42% ``` Those are either caused by noisy benchmarks (e.g. `multiInputOneIdleMapSink`) and lie inside the usual bounds; or - probably - by a recent regression. So I'd suggest to wait for the results of the investigation of the latter, and then re-do the benchmark. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28187) Duplicate job submission for FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-28187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557522#comment-17557522 ] Gyula Fora commented on FLINK-28187: We do upgrades in 2 steps, in the UPGRADING state the expected upgrade target generation is already in the status. We have to put the generation in the jobid otherwise we don't know if a job in upgrading state was already upgraded or not. Please look at this commit: [https://github.com/apache/flink-kubernetes-operator/commit/ab59d6eb980512775590d0d01e697fe0c28d1b3b] This is not so different how applications work also. You always have a single application cluster but still you need to attach the generation info otherwise you cannot deal with errors happening during or directly after submission. > Duplicate job submission for FlinkSessionJob > > > Key: FLINK-28187 > URL: https://issues.apache.org/jira/browse/FLINK-28187 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: Jeesmon Jacob >Priority: Critical > Attachments: flink-operator-log.txt > > > During a session job submission if a deployment error (ex: > concurrent.TimeoutException) is hit, operator will submit the job again. But > first submission could have succeeded in jobManager side and second > submission could result in duplicate job. Operator log attached. > Per [~gyfora]: > The problem is that in case a deployment error was hit, the > SessionJobObserver will not be able to tell whether it has submitted the job > or not. So it will simply try to submit it again. We have to find a mechanism > to correlate Jobs on the cluster with the SessionJob CR itself. Maybe we > could override the job name itself for this purpose or something like that. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28187) Duplicate job submission for FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-28187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557518#comment-17557518 ] Aitozi commented on FLINK-28187: IMO, there is one and only one job for one FlinkSessionJob, so I think the JobID associated with the resource UID will be enough here > Duplicate job submission for FlinkSessionJob > > > Key: FLINK-28187 > URL: https://issues.apache.org/jira/browse/FLINK-28187 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: Jeesmon Jacob >Priority: Critical > Attachments: flink-operator-log.txt > > > During a session job submission if a deployment error (ex: > concurrent.TimeoutException) is hit, operator will submit the job again. But > first submission could have succeeded in jobManager side and second > submission could result in duplicate job. Operator log attached. > Per [~gyfora]: > The problem is that in case a deployment error was hit, the > SessionJobObserver will not be able to tell whether it has submitted the job > or not. So it will simply try to submit it again. We have to find a mechanism > to correlate Jobs on the cluster with the SessionJob CR itself. Maybe we > could override the job name itself for this purpose or something like that. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (FLINK-28187) Duplicate job submission for FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-28187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557513#comment-17557513 ] Aitozi edited comment on FLINK-28187 at 6/22/22 3:19 PM: - I'm afraid of not clearly expressing my meaning. I will try to give an example about what I think: 1. Submit the job with {{Generation1}} , and JobID is generated {{ns/name@Generation1}} 2. The submission timeout but actually succeed and the last reconcile spec not updated 3. User change the spec and the generation become {{Generation2}} (Before the observer have sync the job status and update the last reconcile spec) 4. The observer observe the job with JobID {{ns/name@Generation2}} not match the first job 5. The reconciler reconcile to submit the job with {{Generation2}}. In this sequence, the job {{ns/name@Generation1}} will be orphaned. was (Author: aitozi): I'm afraid of not clearly expressing my meaning. I will try to give an example about what I think: 1. Submit the job with {{Generation1}} , and JobID is generated {{ns/name@Generation1}} 2. The submit timeout but actually succeed and the last reconcile spec not updated 3. User change the spec and the generation become {{Generation2}} (Before the observer have sync the job status and update the last reconcile spec) 4. The observer observe the job with JobID {{ns/name@Generation2}} not match the first job 5. The reconciler reconcile to submit the job with {{Generation2}}. In this sequence, the job {{ns/name@Generation1}} will be orphaned. > Duplicate job submission for FlinkSessionJob > > > Key: FLINK-28187 > URL: https://issues.apache.org/jira/browse/FLINK-28187 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: Jeesmon Jacob >Priority: Critical > Attachments: flink-operator-log.txt > > > During a session job submission if a deployment error (ex: > concurrent.TimeoutException) is hit, operator will submit the job again. But > first submission could have succeeded in jobManager side and second > submission could result in duplicate job. Operator log attached. > Per [~gyfora]: > The problem is that in case a deployment error was hit, the > SessionJobObserver will not be able to tell whether it has submitted the job > or not. So it will simply try to submit it again. We have to find a mechanism > to correlate Jobs on the cluster with the SessionJob CR itself. Maybe we > could override the job name itself for this purpose or something like that. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (FLINK-28187) Duplicate job submission for FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-28187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557513#comment-17557513 ] Aitozi edited comment on FLINK-28187 at 6/22/22 3:16 PM: - I'm afraid of not clearly expressing my meaning. I will try to give an example about what I think: 1. Submit the job with {{Generation1}} , and JobID is generated {{ns/name@Generation1}} 2. The submit timeout but actually succeed and the last reconcile spec not updated 3. User change the spec and the generation become {{Generation2}} (Before the observer have sync the job status and update the last reconcile spec) 4. The observer observe the job with JobID {{ns/name@Generation2}} not match the first job 5. The reconciler reconcile to submit the job with {{Generation2}}. In this sequence, the job {{ns/name@Generation1}} will be orphaned. was (Author: aitozi): I'm afraid of not clearly expressing my meaning. I will try to give an example about what I think: 1. Submit the job with {{Generation1}} , and JobID is generated {{ns/name@Generation1}} 2. The submit timeout but actually succeed and the last reconcile spec not updated 3. User change the spec and the generation become {{Generation2}} (Before the observer have sync the job status and update the last reconcile spec) 4. The observer observe the job with JobID {{ns/name@Generation2}} not match the first job 5. The reconciler reconcile to submit the job with {{Generation2}}. In this sequence, the job1 at generation1 will be orphaned. > Duplicate job submission for FlinkSessionJob > > > Key: FLINK-28187 > URL: https://issues.apache.org/jira/browse/FLINK-28187 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: Jeesmon Jacob >Priority: Critical > Attachments: flink-operator-log.txt > > > During a session job submission if a deployment error (ex: > concurrent.TimeoutException) is hit, operator will submit the job again. But > first submission could have succeeded in jobManager side and second > submission could result in duplicate job. Operator log attached. > Per [~gyfora]: > The problem is that in case a deployment error was hit, the > SessionJobObserver will not be able to tell whether it has submitted the job > or not. So it will simply try to submit it again. We have to find a mechanism > to correlate Jobs on the cluster with the SessionJob CR itself. Maybe we > could override the job name itself for this purpose or something like that. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28187) Duplicate job submission for FlinkSessionJob
[ https://issues.apache.org/jira/browse/FLINK-28187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557513#comment-17557513 ] Aitozi commented on FLINK-28187: I'm afraid of not clearly expressing my meaning. I will try to give an example about what I think: 1. Submit the job with {{Generation1}} , and JobID is generated {{ns/name@Generation1}} 2. The submit timeout but actually succeed and the last reconcile spec not updated 3. User change the spec and the generation become {{Generation2}} (Before the observer have sync the job status and update the last reconcile spec) 4. The observer observe the job with JobID {{ns/name@Generation2}} not match the first job 5. The reconciler reconcile to submit the job with {{Generation2}}. In this sequence, the job1 at generation1 will be orphaned. > Duplicate job submission for FlinkSessionJob > > > Key: FLINK-28187 > URL: https://issues.apache.org/jira/browse/FLINK-28187 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.0.0 >Reporter: Jeesmon Jacob >Priority: Critical > Attachments: flink-operator-log.txt > > > During a session job submission if a deployment error (ex: > concurrent.TimeoutException) is hit, operator will submit the job again. But > first submission could have succeeded in jobManager side and second > submission could result in duplicate job. Operator log attached. > Per [~gyfora]: > The problem is that in case a deployment error was hit, the > SessionJobObserver will not be able to tell whether it has submitted the job > or not. So it will simply try to submit it again. We have to find a mechanism > to correlate Jobs on the cluster with the SessionJob CR itself. Maybe we > could override the job name itself for this purpose or something like that. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-24666) Add job level "table.exec.state-stale.error-handling" option and apply to related stateful stream operators
[ https://issues.apache.org/jira/browse/FLINK-24666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-24666: Description: In stream processing, records will be deleted when exceed state ttl (if configured), and when the corresponding record's update arrives again, the operator may not be able to handle it properly, we need a unified error handling mechanism to handle this situation, instead of each stateful operator currently handling its own. e.g., currently, there's a lenient option which was not exposed to users on RetractableTopNFunction {quote}// flag to skip records with non-exist error instead to fail, true by default. private final boolean lenient = true {quote} So there's no chance to raise the exception when the record(s) unexpectedly cleared by state ttl. Commonly this happens because a too shorter ttl at Line 190 or inconstancy between the two internal state(dataState and treeMap) at other place. {quote}List inputs = dataState.get(key); if (inputs == null) Unknown macro: \{ // Skip the data if it's state is cleared because of state ttl. if (lenient) Unknown macro} else Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } } {quote} We'd better to expose it to users (default value can be true to keep consistent with previous version) was: Currently, the lenient option was not exposed to users on RetractableTopNFunction {quote}// flag to skip records with non-exist error instead to fail, true by default. private final boolean lenient = true {quote} So there's no chance to raise the exception when the record(s) unexpectedly cleared by state ttl. Commonly this happens because a too shorter ttl at Line 190 or inconstancy between the two internal state(dataState and treeMap) at other place. {quote}List inputs = dataState.get(key); if (inputs == null) Unknown macro: \{ // Skip the data if it's state is cleared because of state ttl. if (lenient) Unknown macro} else Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } } {quote} We'd better to expose it to users (default value can be true to keep consistent with previous version) > Add job level "table.exec.state-stale.error-handling" option and apply to > related stateful stream operators > --- > > Key: FLINK-24666 > URL: https://issues.apache.org/jira/browse/FLINK-24666 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.14.0, 1.13.3 >Reporter: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > In stream processing, records will be deleted when exceed state ttl (if > configured), and when the corresponding record's update arrives again, the > operator may not be able to handle it properly, we need a unified error > handling mechanism to handle this situation, > instead of each stateful operator currently handling its own. > e.g., currently, there's a lenient option which was not exposed to users on > RetractableTopNFunction > {quote}// flag to skip records with non-exist error instead to fail, true by > default. > private final boolean lenient = true > {quote} > So there's no chance to raise the exception when the record(s) unexpectedly > cleared by state ttl. Commonly this happens because a too shorter ttl at Line > 190 or inconstancy between the two internal state(dataState and treeMap) at > other place. > {quote}List inputs = dataState.get(key); > if (inputs == null) > Unknown macro: \{ // Skip the data if it's state is cleared because of state > ttl. if (lenient) Unknown macro} > else > Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } > } > {quote} > We'd better to expose it to users (default value can be true to keep > consistent with previous version) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-24666) Add job level "table.exec.state-stale.error-handling" option and apply to related stateful stream operators
[ https://issues.apache.org/jira/browse/FLINK-24666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-24666: Description: In stream processing, records will be deleted when exceed state ttl (if configured), and when the corresponding record's update arrives again, the operator may not be able to handle it properly, we need a unified error handling mechanism to handle this situation, instead of each stateful operator currently handling its own. e.g., currently, there's a lenient option which was not exposed to users on RetractableTopNFunction {quote}// flag to skip records with non-exist error instead to fail, true by default. private final boolean lenient = true {quote} So there's no chance to raise the exception when the record(s) unexpectedly cleared by state ttl. Commonly this happens because a too shorter ttl at Line 190 or inconstancy between the two internal state(dataState and treeMap) at other place. {quote}List inputs = dataState.get(key); if (inputs == null) Unknown macro: \{ // Skip the data if it's state is cleared because of state ttl. if (lenient) Unknown macro} else Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } } {quote} We'd better to expose it to users (default value can be true to keep consistent with previous version) and should be unified to cover all the stateful stream operators than specific to RetractableTopNFunction. was: In stream processing, records will be deleted when exceed state ttl (if configured), and when the corresponding record's update arrives again, the operator may not be able to handle it properly, we need a unified error handling mechanism to handle this situation, instead of each stateful operator currently handling its own. e.g., currently, there's a lenient option which was not exposed to users on RetractableTopNFunction {quote}// flag to skip records with non-exist error instead to fail, true by default. private final boolean lenient = true {quote} So there's no chance to raise the exception when the record(s) unexpectedly cleared by state ttl. Commonly this happens because a too shorter ttl at Line 190 or inconstancy between the two internal state(dataState and treeMap) at other place. {quote}List inputs = dataState.get(key); if (inputs == null) Unknown macro: \{ // Skip the data if it's state is cleared because of state ttl. if (lenient) Unknown macro} else Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } } {quote} We'd better to expose it to users (default value can be true to keep consistent with previous version) > Add job level "table.exec.state-stale.error-handling" option and apply to > related stateful stream operators > --- > > Key: FLINK-24666 > URL: https://issues.apache.org/jira/browse/FLINK-24666 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.14.0, 1.13.3 >Reporter: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > In stream processing, records will be deleted when exceed state ttl (if > configured), and when the corresponding record's update arrives again, the > operator may not be able to handle it properly, we need a unified error > handling mechanism to handle this situation, > instead of each stateful operator currently handling its own. > e.g., currently, there's a lenient option which was not exposed to users on > RetractableTopNFunction > {quote}// flag to skip records with non-exist error instead to fail, true by > default. > private final boolean lenient = true > {quote} > So there's no chance to raise the exception when the record(s) unexpectedly > cleared by state ttl. Commonly this happens because a too shorter ttl at Line > 190 or inconstancy between the two internal state(dataState and treeMap) at > other place. > {quote}List inputs = dataState.get(key); > if (inputs == null) > Unknown macro: \{ // Skip the data if it's state is cleared because of state > ttl. if (lenient) Unknown macro} > else > Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } > } > {quote} > We'd better to expose it to users (default value can be true to keep > consistent with previous version) and should be unified to cover all the > stateful stream operators than specific to RetractableTopNFunction. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] flinkbot commented on pull request #20052: [FLINK-28047][api] Deprecate StreamExecutionEnvironment#readFile()/readTextFile() methods
flinkbot commented on PR #20052: URL: https://github.com/apache/flink/pull/20052#issuecomment-1163228801 ## CI report: * 3806a24b58270e39434a7c2e3f85e284c0b55ec6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-24666) Add job level "table.exec.state-stale.error-handling" option and apply to related stateful stream operators
[ https://issues.apache.org/jira/browse/FLINK-24666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-24666: Description: Currently, the lenient option was not exposed to users on RetractableTopNFunction {quote}// flag to skip records with non-exist error instead to fail, true by default. private final boolean lenient = true {quote} So there's no chance to raise the exception when the record(s) unexpectedly cleared by state ttl. Commonly this happens because a too shorter ttl at Line 190 or inconstancy between the two internal state(dataState and treeMap) at other place. {quote}List inputs = dataState.get(key); if (inputs == null) Unknown macro: \{ // Skip the data if it's state is cleared because of state ttl. if (lenient) Unknown macro} else Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } } {quote} We'd better to expose it to users (default value can be true to keep consistent with previous version) was: Currently, the lenient option was not exposed to users on RetractableTopNFunction {quote}// flag to skip records with non-exist error instead to fail, true by default. private final boolean lenient = true {quote} So there's no chance to raise the exception when the record(s) unexpectedly cleared by state ttl. Commonly this happens because a too shorter ttl at Line 190 or inconstancy between the two internal state(dataState and treeMap) at other place. {quote}List inputs = dataState.get(key); if (inputs == null) { // Skip the data if it's state is cleared because of state ttl. if (lenient) Unknown macro: \{ LOG.warn(STATE_CLEARED_WARN_MSG); } else Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } } {quote} We'd better to expose it to users (default value can be true to keep consistent with previous version) And completely resolve the inconsistency problem (it's different from Line 190 which is uncontrollable by itself) between the two state as follows in another issue. {quote}// a map state stores mapping from sort key to records list private transient MapState> dataState; // a sorted map stores mapping from sort key to records count private transient ValueState> treeMap {quote} > Add job level "table.exec.state-stale.error-handling" option and apply to > related stateful stream operators > --- > > Key: FLINK-24666 > URL: https://issues.apache.org/jira/browse/FLINK-24666 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.14.0, 1.13.3 >Reporter: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently, the lenient option was not exposed to users on > RetractableTopNFunction > {quote}// flag to skip records with non-exist error instead to fail, true by > default. > private final boolean lenient = true > {quote} > So there's no chance to raise the exception when the record(s) unexpectedly > cleared by state ttl. Commonly this happens because a too shorter ttl at Line > 190 or inconstancy between the two internal state(dataState and treeMap) at > other place. > {quote}List inputs = dataState.get(key); > if (inputs == null) > Unknown macro: \{ // Skip the data if it's state is cleared because of state > ttl. if (lenient) Unknown macro} > else > Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } > } > {quote} > We'd better to expose it to users (default value can be true to keep > consistent with previous version) -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-24666) Add job level "table.exec.state-stale.error-handling" option and apply to related stateful stream operators
[ https://issues.apache.org/jira/browse/FLINK-24666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-24666: Summary: Add job level "table.exec.state-stale.error-handling" option and apply to related stateful stream operators (was: Add job level "table.exec.state-staled.error-handling" option and apply to related stateful stream operators) > Add job level "table.exec.state-stale.error-handling" option and apply to > related stateful stream operators > --- > > Key: FLINK-24666 > URL: https://issues.apache.org/jira/browse/FLINK-24666 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.14.0, 1.13.3 >Reporter: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently, the lenient option was not exposed to users on > RetractableTopNFunction > {quote}// flag to skip records with non-exist error instead to fail, true by > default. > private final boolean lenient = true > {quote} > So there's no chance to raise the exception when the record(s) unexpectedly > cleared by state ttl. Commonly this happens because a too shorter ttl at Line > 190 or inconstancy between the two internal state(dataState and treeMap) at > other place. > {quote}List inputs = dataState.get(key); > if (inputs == null) { > // Skip the data if it's state is cleared because of state ttl. > if (lenient) > Unknown macro: \{ LOG.warn(STATE_CLEARED_WARN_MSG); } > else > Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } > } > {quote} > We'd better to expose it to users (default value can be true to keep > consistent with previous version) > > And completely resolve the inconsistency problem (it's different from Line > 190 which is uncontrollable by itself) between the two state as follows in > another issue. > {quote}// a map state stores mapping from sort key to records list > private transient MapState> dataState; > // a sorted map stores mapping from sort key to records count > private transient ValueState> treeMap > {quote} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28047) Annotate StreamExecutionEnvironment#readFile()/readTextFile(*) methods deprecated in favor of FileSource#forRecordStreamFormat/forBulkFileFormat
[ https://issues.apache.org/jira/browse/FLINK-28047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557507#comment-17557507 ] Alexander Fedulov commented on FLINK-28047: --- [~martijnvisser] this is also done, PTAL. > Annotate StreamExecutionEnvironment#readFile()/readTextFile(*) methods > deprecated in favor of FileSource#forRecordStreamFormat/forBulkFileFormat > > > Key: FLINK-28047 > URL: https://issues.apache.org/jira/browse/FLINK-28047 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Connectors / FileSystem >Affects Versions: 1.15.2 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] flinkbot commented on pull request #20051: [FLINK-24666][table-runtime] Add job level table.exec.state-stale.error-handling option and apply to related stateful stream operators
flinkbot commented on PR #20051: URL: https://github.com/apache/flink/pull/20051#issuecomment-1163227528 ## CI report: * 36837b1b02a9ea3998c826b80377bcd1f518dffd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuyangzhong commented on pull request #19866: [FLINK-27876][table-planner] choose right side as build side if left size is equal with right in semi/anti
xuyangzhong commented on PR #19866: URL: https://github.com/apache/flink/pull/19866#issuecomment-1163227335 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-28199) Failures on YARNHighAvailabilityITCase.testClusterClientRetrieval and YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-28199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557506#comment-17557506 ] Biao Geng edited comment on FLINK-28199 at 6/22/22 3:03 PM: According to the log, the exception is thrown as the yarn application for testKillYarnSessionClusterEntrypoint is not stopped as expected, which also leads to the failure of testClusterClientRetrieval furthermore. Similary as [~ferenc-csaky]'s analysis, IIUC, the PR for FLINK-27677 may not be relevant to this failure as FLINK-27677 's codes only change the @AfterAll method, which should be executed after all tests finished while this failure happens after a single test. It looks that {{killApplicationAndWait()}} may wrongly return due to the side effect of the previous test. was (Author: bgeng777): According to the log, the exception is thrown as the yarn application for testKillYarnSessionClusterEntrypoint is not stopped as expected, which also leads to the failure of testClusterClientRetrieval furthermore. IIUC, the PR for FLINK-27677 may not be relevant to this failure as [~ferenc-csaky]'s codes only change the @AfterAll method, which should be executed after all tests finished while this failure happens after a single test. It looks that {{killApplicationAndWait()}} may wrongly return due to the side effect of the previous test. > Failures on YARNHighAvailabilityITCase.testClusterClientRetrieval and > YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint > - > > Key: FLINK-28199 > URL: https://issues.apache.org/jira/browse/FLINK-28199 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.16.0 >Reporter: Martijn Visser >Priority: Major > Labels: test-stability > > {code:java} > Jun 22 08:57:50 [ERROR] Errors: > Jun 22 08:57:50 [ERROR] > YARNHighAvailabilityITCase.testClusterClientRetrieval » Timeout > testClusterCli... > Jun 22 08:57:50 [ERROR] > YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint:156->YarnTestBase.runTest:288->lambda$testKillYarnSessionClusterEntrypoint$0:182->waitForJobTermination:325 > » Execution > Jun 22 08:57:50 [INFO] > Jun 22 08:57:50 [ERROR] Tests run: 27, Failures: 0, Errors: 2, Skipped: 0 > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37037=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=29523 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] xuyangzhong commented on pull request #19850: [FLINK-27851][table-planner] support to access source pks through MiniBatchAssigner
xuyangzhong commented on PR #19850: URL: https://github.com/apache/flink/pull/19850#issuecomment-1163226270 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-28047) Annotate StreamExecutionEnvironment#readFile()/readTextFile(*) methods deprecated in favor of FileSource#forRecordStreamFormat/forBulkFileFormat
[ https://issues.apache.org/jira/browse/FLINK-28047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Fedulov reassigned FLINK-28047: - Assignee: Alexander Fedulov > Annotate StreamExecutionEnvironment#readFile()/readTextFile(*) methods > deprecated in favor of FileSource#forRecordStreamFormat/forBulkFileFormat > > > Key: FLINK-28047 > URL: https://issues.apache.org/jira/browse/FLINK-28047 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Connectors / FileSystem >Affects Versions: 1.15.2 >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-28199) Failures on YARNHighAvailabilityITCase.testClusterClientRetrieval and YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-28199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557506#comment-17557506 ] Biao Geng commented on FLINK-28199: --- According to the log, the exception is thrown as the yarn application for testKillYarnSessionClusterEntrypoint is not stopped as expected, which also leads to the failure of testClusterClientRetrieval furthermore. IIUC, the PR for FLINK-27677 may not be relevant to this failure as [~ferenc-csaky]'s codes only change the @AfterAll method, which should be executed after all tests finished while this failure happens after a single test. It looks that {{killApplicationAndWait()}} may wrongly return due to the side effect of the previous test. > Failures on YARNHighAvailabilityITCase.testClusterClientRetrieval and > YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint > - > > Key: FLINK-28199 > URL: https://issues.apache.org/jira/browse/FLINK-28199 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.16.0 >Reporter: Martijn Visser >Priority: Major > Labels: test-stability > > {code:java} > Jun 22 08:57:50 [ERROR] Errors: > Jun 22 08:57:50 [ERROR] > YARNHighAvailabilityITCase.testClusterClientRetrieval » Timeout > testClusterCli... > Jun 22 08:57:50 [ERROR] > YARNHighAvailabilityITCase.testKillYarnSessionClusterEntrypoint:156->YarnTestBase.runTest:288->lambda$testKillYarnSessionClusterEntrypoint$0:182->waitForJobTermination:325 > » Execution > Jun 22 08:57:50 [INFO] > Jun 22 08:57:50 [ERROR] Tests run: 27, Failures: 0, Errors: 2, Skipped: 0 > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37037=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=29523 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (FLINK-28047) Annotate StreamExecutionEnvironment#readFile()/readTextFile(*) methods deprecated in favor of FileSource#forRecordStreamFormat/forBulkFileFormat
[ https://issues.apache.org/jira/browse/FLINK-28047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28047: --- Labels: pull-request-available (was: ) > Annotate StreamExecutionEnvironment#readFile()/readTextFile(*) methods > deprecated in favor of FileSource#forRecordStreamFormat/forBulkFileFormat > > > Key: FLINK-28047 > URL: https://issues.apache.org/jira/browse/FLINK-28047 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Connectors / FileSystem >Affects Versions: 1.15.2 >Reporter: Alexander Fedulov >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] afedulov opened a new pull request, #20052: [FLINK-28047][api] Deprecate StreamExecutionEnvironment#readFile()/readTextFile() methods
afedulov opened a new pull request, #20052: URL: https://github.com/apache/flink/pull/20052 This is a trivial change that deprecates StreamExecutionEnvironment#readFile()/readTextFile() methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-24666) Add job level "table.exec.state-staled.error-handling" option and apply to related stateful stream operators
[ https://issues.apache.org/jira/browse/FLINK-24666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-24666: --- Labels: pull-request-available (was: ) > Add job level "table.exec.state-staled.error-handling" option and apply to > related stateful stream operators > > > Key: FLINK-24666 > URL: https://issues.apache.org/jira/browse/FLINK-24666 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.14.0, 1.13.3 >Reporter: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently, the lenient option was not exposed to users on > RetractableTopNFunction > {quote}// flag to skip records with non-exist error instead to fail, true by > default. > private final boolean lenient = true > {quote} > So there's no chance to raise the exception when the record(s) unexpectedly > cleared by state ttl. Commonly this happens because a too shorter ttl at Line > 190 or inconstancy between the two internal state(dataState and treeMap) at > other place. > {quote}List inputs = dataState.get(key); > if (inputs == null) { > // Skip the data if it's state is cleared because of state ttl. > if (lenient) > Unknown macro: \{ LOG.warn(STATE_CLEARED_WARN_MSG); } > else > Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); } > } > {quote} > We'd better to expose it to users (default value can be true to keep > consistent with previous version) > > And completely resolve the inconsistency problem (it's different from Line > 190 which is uncontrollable by itself) between the two state as follows in > another issue. > {quote}// a map state stores mapping from sort key to records list > private transient MapState> dataState; > // a sorted map stores mapping from sort key to records count > private transient ValueState> treeMap > {quote} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [flink] lincoln-lil opened a new pull request, #20051: [FLINK-24666][table-runtime] Add job level table.exec.state-stale.error-handling option and apply to related stateful stream operators
lincoln-lil opened a new pull request, #20051: URL: https://github.com/apache/flink/pull/20051 ## What is the purpose of the change In stream processing, records will be deleted when exceed state ttl (if configured), and when the corresponding record's update arrives again, the operator may not be able to handle it properly, we need a unified error handling mechanism to handle this situation, instead of each stateful operator currently handling its own. TODO the document will be updated if no comment on the option description. ## Brief change log * add 'table.exec.state-stale.error-handling' to ExecutionConfigOptions * add utility class `ErrorHandlingUtil` for unified state stale error handling * apply the state stale error handling logic to related stateful streaming operators * update existing tests to ensure error handling is covered ## Verifying this change updated existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with @Public(Evolving): (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org