Re: [PR] [FLINK-30064] Move existing Hive connector code from Flink repo to dedicated Hive repo [flink-connector-hive]
luoyuxia commented on PR #5: URL: https://github.com/apache/flink-connector-hive/pull/5#issuecomment-1837025511 Cool! @snuyanzin Thanks for the pr. Sorry for the late response for I'm busy with other things. I'll definitely have a look in next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
gyfora commented on PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#issuecomment-1837021377 @mxm i would like to take a look tomorrow or Monday morning because today I’m traveling. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33711] Fix numbers of field of the taxi event / Fix broken link [flink-training]
alpinegizmo merged PR #66: URL: https://github.com/apache/flink-training/pull/66 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33711] Fix numbers of field of the taxi event / Fix broken link [flink-training]
alpinegizmo commented on PR #66: URL: https://github.com/apache/flink-training/pull/66#issuecomment-1837007643 Thanks for the update! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33698) Fix the backoff time calculation in ExponentialBackoffDelayRetryStrategy
[ https://issues.apache.org/jira/browse/FLINK-33698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee closed FLINK-33698. --- Fix Version/s: 1.19.0 Resolution: Fixed fixed in master: 5da214c963c219c8b3da727ffde5d6995b3770b8 > Fix the backoff time calculation in ExponentialBackoffDelayRetryStrategy > > > Key: FLINK-33698 > URL: https://issues.apache.org/jira/browse/FLINK-33698 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: xiangyu feng >Assignee: xiangyu feng >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > The backoff time calculation in `ExponentialBackoffDelayRetryStrategy` should > consider currentAttempts. > > Current Version: > {code:java} > @Override > public long getBackoffTimeMillis(int currentAttempts) { > if (currentAttempts <= 1) { > // equivalent to initial delay > return lastRetryDelay; > } > long backoff = Math.min((long) (lastRetryDelay * multiplier), > maxRetryDelay); > this.lastRetryDelay = backoff; > return backoff; > } {code} > Fixed Version: > {code:java} > @Override > public long getBackoffTimeMillis(int currentAttempts) { > if (currentAttempts <= 1) { > // equivalent to initial delay > return initialDelay; > } > long backoff = > Math.min( > (long) (initialDelay * Math.pow(multiplier, > currentAttempts - 1)), > maxRetryDelay); > return backoff; > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33698][datastream] Fix the backoff time calculation in ExponentialBackoffDelayRetryStrategy [flink]
lincoln-lil merged PR #23830: URL: https://github.com/apache/flink/pull/23830 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add Kafka connector v3.0.2 release [flink-web]
tzulitai closed pull request #700: Add Kafka connector v3.0.2 release URL: https://github.com/apache/flink-web/pull/700 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33638) Support variable-length data generation for variable-length data types
[ https://issues.apache.org/jira/browse/FLINK-33638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee closed FLINK-33638. --- Resolution: Fixed > Support variable-length data generation for variable-length data types > -- > > Key: FLINK-33638 > URL: https://issues.apache.org/jira/browse/FLINK-33638 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Affects Versions: 1.19.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently, for variable-length data types (varchar, varbinary, string, > bytes), datagen connector always generates max-length data, we can extending > datagen to generate variable length values(using a new option to enable it, > e.g.,'fields.f0.var-len'='true'). > the topic has been discussed in the mail thread > [https://lists.apache.org/thread/kp6popo4cnhl6vx31sdn6mlscpzj9tgc] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33638) Support variable-length data generation for variable-length data types
[ https://issues.apache.org/jira/browse/FLINK-33638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-33638: Component/s: (was: Connectors / Common) > Support variable-length data generation for variable-length data types > -- > > Key: FLINK-33638 > URL: https://issues.apache.org/jira/browse/FLINK-33638 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Affects Versions: 1.19.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently, for variable-length data types (varchar, varbinary, string, > bytes), datagen connector always generates max-length data, we can extending > datagen to generate variable length values(using a new option to enable it, > e.g.,'fields.f0.var-len'='true'). > the topic has been discussed in the mail thread > [https://lists.apache.org/thread/kp6popo4cnhl6vx31sdn6mlscpzj9tgc] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33638) Support variable-length data generation for variable-length data types
[ https://issues.apache.org/jira/browse/FLINK-33638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-33638: Component/s: Table SQL / Ecosystem > Support variable-length data generation for variable-length data types > -- > > Key: FLINK-33638 > URL: https://issues.apache.org/jira/browse/FLINK-33638 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common, Table SQL / Ecosystem >Affects Versions: 1.19.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently, for variable-length data types (varchar, varbinary, string, > bytes), datagen connector always generates max-length data, we can extending > datagen to generate variable length values(using a new option to enable it, > e.g.,'fields.f0.var-len'='true'). > the topic has been discussed in the mail thread > [https://lists.apache.org/thread/kp6popo4cnhl6vx31sdn6mlscpzj9tgc] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33638) Support variable-length data generation for variable-length data types
[ https://issues.apache.org/jira/browse/FLINK-33638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792277#comment-17792277 ] lincoln lee commented on FLINK-33638: - fixed in master: ac707cfeb24585ec4d68ee3dee22b64e42c454b2 > Support variable-length data generation for variable-length data types > -- > > Key: FLINK-33638 > URL: https://issues.apache.org/jira/browse/FLINK-33638 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.19.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently, for variable-length data types (varchar, varbinary, string, > bytes), datagen connector always generates max-length data, we can extending > datagen to generate variable length values(using a new option to enable it, > e.g.,'fields.f0.var-len'='true'). > the topic has been discussed in the mail thread > [https://lists.apache.org/thread/kp6popo4cnhl6vx31sdn6mlscpzj9tgc] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33638) Support variable-length data generation for variable-length data types
[ https://issues.apache.org/jira/browse/FLINK-33638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-33638: Fix Version/s: 1.19.0 > Support variable-length data generation for variable-length data types > -- > > Key: FLINK-33638 > URL: https://issues.apache.org/jira/browse/FLINK-33638 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.19.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently, for variable-length data types (varchar, varbinary, string, > bytes), datagen connector always generates max-length data, we can extending > datagen to generate variable length values(using a new option to enable it, > e.g.,'fields.f0.var-len'='true'). > the topic has been discussed in the mail thread > [https://lists.apache.org/thread/kp6popo4cnhl6vx31sdn6mlscpzj9tgc] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33638][table] Support variable-length data generation for variable-length data types [flink]
lincoln-lil merged PR #23810: URL: https://github.com/apache/flink/pull/23810 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33638][table] Support variable-length data generation for variable-length data types [flink]
lincoln-lil commented on PR #23810: URL: https://github.com/apache/flink/pull/23810#issuecomment-1836961492 looks good, merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33667] Implement restore tests for MatchRecognize node [flink]
jnh5y commented on code in PR #23821: URL: https://github.com/apache/flink/pull/23821#discussion_r1412552616 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeTestPrograms.java: ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecMatch}. */ +public class MatchRecognizeTestPrograms { +static final Row[] SIMPLE_DATA = { +Row.of(1L, "a"), +Row.of(2L, "z"), +Row.of(3L, "b"), +Row.of(4L, "c"), +Row.of(5L, "d"), +Row.of(6L, "a"), +Row.of(7L, "b"), +Row.of(8L, "c"), +Row.of(9L, "a"), +Row.of(10L, "b") +}; + +static final Row[] SIMPLE_DATA2 = {Row.of(11L, "c")}; + +static final Row[] COMPLEX_DATA = { +Row.of("ACME", 1L, 19, 1), +Row.of("BETA", 2L, 18, 1), +Row.of("ACME", 3L, 17, 2), +Row.of("ACME", 4L, 13, 3), +Row.of("BETA", 5L, 16, 2), +Row.of("ACME", 6L, 20, 4) +}; + +static final Row[] COMPLEX_DATA2 = {Row.of("BETA", 7L, 22, 4)}; + +static final TableTestProgram SIMPLE_MATCH = +TableTestProgram.of("simple-match", "simple match recognize test") +.setupTableSource( +SourceTestStep.newBuilder("MyTable") +.addSchema( +"id bigint", "name varchar", "proctime as PROCTIME()") +.producedBeforeRestore(SIMPLE_DATA) +.producedAfterRestore(SIMPLE_DATA2) +.build()) +.setupTableSink( +SinkTestStep.newBuilder("MySink") +.addSchema("a bigint", "b bigint", "c bigint") +.consumedBeforeRestore(Row.of(6L, 7L, 8L)) +.consumedAfterRestore(Row.of(9L, 10L, 11L)) +.build()) +.runSql( +"insert into MySink" ++ " SELECT T.aid, T.bid, T.cid\n" ++ " FROM MyTable MATCH_RECOGNIZE (\n" ++ " ORDER BY proctime\n" ++ " MEASURES\n" ++ " `A\"`.id AS aid,\n" ++ " \u006C.id AS bid,\n" ++ " C.id AS cid\n" ++ " PATTERN (`A\"` \u006C C)\n" ++ " DEFINE\n" ++ " `A\"` AS name = 'a',\n" ++ " \u006C AS name = 'b',\n" ++ " C AS name = 'c'\n" ++ " ) AS T") +.build(); + +static final TableTestProgram COMPLEX_MATCH = +TableTestProgram.of("complex-match", "complex match recognize test") +.setupTableSource( +SourceTestStep.newBuilder("MyTable") +.addSchema( +"symbol string", +"tstamp bigint", +"price int", +"tax int", +"proctime as PROCTIME()") Review Comment: Nice; thanks for the suggestion. I've added two tests in
[PR] Bump org.opensearch:opensearch from 1.3.0 to 2.11.1 in /flink-connector-opensearch [flink-connector-opensearch]
dependabot[bot] opened a new pull request, #37: URL: https://github.com/apache/flink-connector-opensearch/pull/37 Bumps [org.opensearch:opensearch](https://github.com/opensearch-project/OpenSearch) from 1.3.0 to 2.11.1. Release notes Sourced from https://github.com/opensearch-project/OpenSearch/releases;>org.opensearch:opensearch's releases. 2.11.1 2023-11-20 Version 2.11.1 Release Notes [2.11.1] Changed Use iterative approach to evaluate Regex.simpleMatch (https://redirect.github.com/opensearch-project/OpenSearch/pull/11060;>#11060) Fixed [BUG] Disable sort optimization for HALF_FLOAT (https://redirect.github.com/opensearch-project/OpenSearch/pull/10999;>#10999) Adding version condition while adding geoshape doc values to the index, to ensure backward compatibility.(https://redirect.github.com/opensearch-project/OpenSearch/pull/11095;>#11095) Remove shadowJar from lang-painless module publication (https://redirect.github.com/opensearch-project/OpenSearch/issues/11369;>#11369) 2.11.0 2023-10-12 Version 2.11.0 Release Notes [2.11] Added Add coordinator level stats for search latency (https://redirect.github.com/opensearch-project/OpenSearch/issues/8386;>#8386) Add metrics for thread_pool task wait time (https://redirect.github.com/opensearch-project/OpenSearch/pull/9681;>#9681) Async blob read support for S3 plugin (https://redirect.github.com/opensearch-project/OpenSearch/pull/9694;>#9694) [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter (https://redirect.github.com/opensearch-project/OpenSearch/pull/9666;>#9666) Async blob read support for encrypted containers (https://redirect.github.com/opensearch-project/OpenSearch/pull/10131;>#10131) Implement Visitor Design pattern in QueryBuilder to enable the capability to traverse through the complex QueryBuilder tree. (https://redirect.github.com/opensearch-project/OpenSearch/pull/10110;>#10110) Add capability to restrict async durability mode for remote indexes (https://redirect.github.com/opensearch-project/OpenSearch/pull/10189;>#10189) Add Doc Status Counter for Indexing Engine (https://redirect.github.com/opensearch-project/OpenSearch/issues/4562;>#4562) Add unreferenced file cleanup count to merge stats (https://redirect.github.com/opensearch-project/OpenSearch/pull/10204;>#10204) Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy (https://redirect.github.com/opensearch-project/OpenSearch/pull/9992;>#9992) [Remote Store] Add support to restrict creation deletion if system repository and mutation of immutable settings of system repository (https://redirect.github.com/opensearch-project/OpenSearch/pull/9839;>#9839) Improve compressed request handling (https://redirect.github.com/opensearch-project/OpenSearch/pull/10261;>#10261) Dependencies Bump JNA version from 5.5 to 5.13 (https://redirect.github.com/opensearch-project/OpenSearch/pull/9963;>#9963) Bump peter-evans/create-or-update-comment from 2 to 3 (https://redirect.github.com/opensearch-project/OpenSearch/pull/9575;>#9575) Bump actions/checkout from 2 to 4 (https://redirect.github.com/opensearch-project/OpenSearch/pull/9968;>#9968) Bump OpenTelemetry from 1.26.0 to 1.30.1 (https://redirect.github.com/opensearch-project/OpenSearch/pull/9950;>#9950) Bump org.apache.commons:commons-compress from 1.23.0 to 1.24.0 ([#9973, https://redirect.github.com/opensearch-project/OpenSearch/issues/9972;>#9972](https://redirect.github.com/opensearch-project/OpenSearch/pull/9973;>opensearch-project/OpenSearch#9973, https://redirect.github.com/opensearch-project/OpenSearch/pull/9972;>opensearch-project/OpenSearch#9972)) Bump com.google.cloud:google-cloud-core-http from 2.21.1 to 2.23.0 (https://redirect.github.com/opensearch-project/OpenSearch/pull/9971;>#9971) Bump mockito from 5.4.0 to 5.5.0 (https://redirect.github.com/opensearch-project/OpenSearch/pull/10022;>#10022) Bump bytebuddy from 1.14.3 to 1.14.7 (https://redirect.github.com/opensearch-project/OpenSearch/pull/10022;>#10022) Bump com.zaxxer:SparseBitSet from 1.2 to 1.3 (https://redirect.github.com/opensearch-project/OpenSearch/pull/10098;>#10098) Bump tibdex/github-app-token from 1.5.0 to 2.1.0 (https://redirect.github.com/opensearch-project/OpenSearch/pull/10125;>#10125) Bump org.wiremock:wiremock-standalone from 2.35.0 to 3.1.0 (https://redirect.github.com/opensearch-project/OpenSearch/pull/9752;>#9752) Bump org.eclipse.jgit from 6.5.0 to 6.7.0 (https://redirect.github.com/opensearch-project/OpenSearch/pull/10147;>#10147) Bump codecov/codecov-action from 2 to 3 (https://redirect.github.com/opensearch-project/OpenSearch/pull/10209;>#10209) Bump com.google.http-client:google-http-client-jackson2 from 1.43.2 to 1.43.3 (https://redirect.github.com/opensearch-project/OpenSearch/pull/10126;>#10126) Bump
Re: [PR] [FLINK-33672] Use MapState.entries() instead of keys() and get() in over window [flink]
flinkbot commented on PR #23855: URL: https://github.com/apache/flink/pull/23855#issuecomment-1836512337 ## CI report: * ef1b449c5654fb6430d486a5e16a00cbe7d93101 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33672) Use MapState.entries() instead of keys() and get() in over window
[ https://issues.apache.org/jira/browse/FLINK-33672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33672: --- Labels: pull-request-available (was: ) > Use MapState.entries() instead of keys() and get() in over window > - > > Key: FLINK-33672 > URL: https://issues.apache.org/jira/browse/FLINK-33672 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > > In code logic related with over windows, such as > org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction > {code:java} > private transient MapState> inputState; > public void onTimer( > long timestamp, > KeyedProcessFunction.OnTimerContext ctx, > Collector out) > throws Exception { > //... > Iterator iter = inputState.keys().iterator(); > //... > while (iter.hasNext()) { > Long elementKey = iter.next(); > if (elementKey < limit) { > // element key outside of window. Retract values > List elementsRemove = inputState.get(elementKey); > // ... > } > } > //... > } {code} > As we can see, there is a combination of key iteration and get the value for > iterated key from inputState. However for RocksDB, the key iteration calls > entry iteration, which means actually we could replace it by entry iteration > without introducing any extra overhead. And as a result, we could save a > function call of get() by using getValue() of iterated entry at very low cost. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33672] Use MapState.entries() instead of keys() and get() in over window [flink]
Zakelly opened a new pull request, #23855: URL: https://github.com/apache/flink/pull/23855 ## What is the purpose of the change This PR replaces any combination of key iteration and getting the value for iterated key from MapState with a entry iteration followed by getKey() and getValue() of iterated entry. This could save the overhead of get() in MapState. ## Brief change log Equivalent transformation of code in package ```org.apache.flink.table.runtime.operators.over```. ## Verifying this change This change is a trivial rework without any new test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33584][Filesystems] Update Hadoop Filesystem dependencies to 3.3.6 [flink]
snuyanzin commented on code in PR #23844: URL: https://github.com/apache/flink/pull/23844#discussion_r1412350731 ## flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java: ## @@ -68,7 +68,7 @@ public HadoopS3AccessHelper(S3AFileSystem s3a, Configuration conf) { @Override public String startMultiPartUpload(String key) throws IOException { -return s3accessHelper.initiateMultiPartUpload(key); +return s3accessHelper.initiateMultiPartUpload(key, null); Review Comment: yes, makes sense done ## flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java: ## @@ -84,13 +84,14 @@ public UploadPartResult uploadPart( null, inputFile, 0L); -return s3accessHelper.uploadPart(uploadRequest); +return s3accessHelper.uploadPart(uploadRequest, null); } @Override public PutObjectResult putObject(String key, File inputFile) throws IOException { -final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputFile); -return s3accessHelper.putObject(putRequest); +final PutObjectRequest putRequest = +s3accessHelper.createPutObjectRequest(key, inputFile, null); +return s3accessHelper.putObject(putRequest, null, null); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
afedulov commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412305985 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); Review Comment: Makes sense, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
mxm commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412287210 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); Review Comment: Yes, that is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
mxm commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412278454 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); + +// Check that the overrides actually changed and not merely the String representation +var flinkConfig = ctx.getResource().getSpec().getFlinkConfiguration(); +var newOverrides = flinkConfig.get(PipelineOptions.PARALLELISM_OVERRIDES.key()); +if (existingOverrides != null && newOverrides != null) { Review Comment: The rewritten version using Gyula's suggestions doesn't require this anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
mxm commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412274565 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -137,7 +141,13 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec(); SPEC currentDeploySpec = cr.getSpec(); -applyAutoscaler(ctx); +applyAutoscaler( +ctx, +lastReconciledSpec != null Review Comment: @gyfora Please check out the newest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33470) Implement restore tests for Join node
[ https://issues.apache.org/jira/browse/FLINK-33470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-33470. Resolution: Implemented Implemented in e886dfdda6cd927548c8af0a88e78171e7ba34a8..5edc7d7b18e88cc86e84d197202d8cbb40621864 > Implement restore tests for Join node > - > > Key: FLINK-33470 > URL: https://issues.apache.org/jira/browse/FLINK-33470 > Project: Flink > Issue Type: Sub-task >Reporter: Jim Hughes >Assignee: Jim Hughes >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]
dawidwys merged PR #23680: URL: https://github.com/apache/flink/pull/23680 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32878) Add entropy to gcs path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-32878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-32878. -- Resolution: Fixed > Add entropy to gcs path for better scalability > -- > > Key: FLINK-32878 > URL: https://issues.apache.org/jira/browse/FLINK-32878 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.4, 1.16.2, 1.17.1 >Reporter: Jayadeep Jayaraman >Assignee: Cheena >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently GCS is used as a backend for both checkpointing and sink. In both > these cases the file names are sequential which causes hotspotting in GCS and > results in HTTP 5XX status code. > > As per [GCS best > practices]([https://cloud.google.com/storage/docs/request-rate)] it is > advisable to spread out the object creation with random names close to the > beginning of the file name. > > There is similar work done already for S3 as part of FLINK-9061 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32878) Add entropy to gcs path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-32878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792115#comment-17792115 ] Martijn Visser commented on FLINK-32878: I reverted this change via c7da98f23c3f86de3a3b12355fa7a1289200f93d because CI indicated that it had failed and I couldn't compile it locally. However, CI was failing because of an unrelated error (download issue on NPM) and my local version failed because of my mistake. So re-enabled it via 87d7b4abd0a1ec92433603c83401cc8ad00fd500 > Add entropy to gcs path for better scalability > -- > > Key: FLINK-32878 > URL: https://issues.apache.org/jira/browse/FLINK-32878 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.4, 1.16.2, 1.17.1 >Reporter: Jayadeep Jayaraman >Assignee: Cheena >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently GCS is used as a backend for both checkpointing and sink. In both > these cases the file names are sequential which causes hotspotting in GCS and > results in HTTP 5XX status code. > > As per [GCS best > practices]([https://cloud.google.com/storage/docs/request-rate)] it is > advisable to spread out the object creation with random names close to the > beginning of the file name. > > There is similar work done already for S3 as part of FLINK-9061 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Clarity And Impact: Revamped Flink Feature Descriptions [flink]
flinkbot commented on PR #23854: URL: https://github.com/apache/flink/pull/23854#issuecomment-1836316123 ## CI report: * 6fee7e7286b062c51a2153322fab684baadfd813 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33702) Add IncrementalDelayRetryStrategy implementation of RetryStrategy
[ https://issues.apache.org/jira/browse/FLINK-33702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiangyu feng updated FLINK-33702: - Summary: Add IncrementalDelayRetryStrategy implementation of RetryStrategy (was: Add IncrementalDelayRetryStrategy in AsyncRetryStrategies ) > Add IncrementalDelayRetryStrategy implementation of RetryStrategy > - > > Key: FLINK-33702 > URL: https://issues.apache.org/jira/browse/FLINK-33702 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: xiangyu feng >Priority: Major > Labels: pull-request-available > > RetryStrategy now supports FixedRetryStrategy and > ExponentialBackoffRetryStrategy. > In certain scenarios, we also need IncrementalDelayRetryStrategy to reduce > the retry count and perform the action more timely. > IncrementalDelayRetryStrategy will increase the retry delay at a fixed rate > for each attempt. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33702) Add IncrementalDelayRetryStrategy in AsyncRetryStrategies
[ https://issues.apache.org/jira/browse/FLINK-33702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiangyu feng updated FLINK-33702: - Description: RetryStrategy now supports FixedRetryStrategy and ExponentialBackoffRetryStrategy. In certain scenarios, we also need IncrementalDelayRetryStrategy to reduce the retry count and perform the action more timely. IncrementalDelayRetryStrategy will increase the retry delay at a fixed rate for each attempt. was: AsyncRetryStrategies now supports NoRetryStrategy, FixedDelayRetryStrategy and ExponentialBackoffDelayRetryStrategy. In certain scenarios, we also need IncrementalDelayRetryStrategy to reduce the retry count and perform the action more timely. IncrementalDelayRetryStrategy will increase the retry delay at a fixed rate for each attempt. > Add IncrementalDelayRetryStrategy in AsyncRetryStrategies > -- > > Key: FLINK-33702 > URL: https://issues.apache.org/jira/browse/FLINK-33702 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: xiangyu feng >Priority: Major > Labels: pull-request-available > > RetryStrategy now supports FixedRetryStrategy and > ExponentialBackoffRetryStrategy. > In certain scenarios, we also need IncrementalDelayRetryStrategy to reduce > the retry count and perform the action more timely. > IncrementalDelayRetryStrategy will increase the retry delay at a fixed rate > for each attempt. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-33702) Add IncrementalDelayRetryStrategy in AsyncRetryStrategies
[ https://issues.apache.org/jira/browse/FLINK-33702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiangyu feng reopened FLINK-33702: -- `RetryStrategy` in org.apache.flink.util.concurrent can be used in `CollectResultFetcher`. > Add IncrementalDelayRetryStrategy in AsyncRetryStrategies > -- > > Key: FLINK-33702 > URL: https://issues.apache.org/jira/browse/FLINK-33702 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: xiangyu feng >Priority: Major > Labels: pull-request-available > > AsyncRetryStrategies now supports NoRetryStrategy, FixedDelayRetryStrategy > and ExponentialBackoffDelayRetryStrategy. > In certain scenarios, we also need IncrementalDelayRetryStrategy to reduce > the retry count and perform the action more timely. > IncrementalDelayRetryStrategy will increase the retry delay at a fixed rate > for each attempt. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33667] Implement restore tests for MatchRecognize node [flink]
dawidwys commented on code in PR #23821: URL: https://github.com/apache/flink/pull/23821#discussion_r1412233154 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeTestPrograms.java: ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecMatch}. */ +public class MatchRecognizeTestPrograms { +static final Row[] SIMPLE_DATA = { +Row.of(1L, "a"), +Row.of(2L, "z"), +Row.of(3L, "b"), +Row.of(4L, "c"), +Row.of(5L, "d"), +Row.of(6L, "a"), +Row.of(7L, "b"), +Row.of(8L, "c"), +Row.of(9L, "a"), +Row.of(10L, "b") +}; + +static final Row[] SIMPLE_DATA2 = {Row.of(11L, "c")}; + +static final Row[] COMPLEX_DATA = { +Row.of("ACME", 1L, 19, 1), +Row.of("BETA", 2L, 18, 1), +Row.of("ACME", 3L, 17, 2), +Row.of("ACME", 4L, 13, 3), +Row.of("BETA", 5L, 16, 2), +Row.of("ACME", 6L, 20, 4) +}; + +static final Row[] COMPLEX_DATA2 = {Row.of("BETA", 7L, 22, 4)}; + +static final TableTestProgram SIMPLE_MATCH = +TableTestProgram.of("simple-match", "simple match recognize test") +.setupTableSource( +SourceTestStep.newBuilder("MyTable") +.addSchema( +"id bigint", "name varchar", "proctime as PROCTIME()") +.producedBeforeRestore(SIMPLE_DATA) +.producedAfterRestore(SIMPLE_DATA2) +.build()) +.setupTableSink( +SinkTestStep.newBuilder("MySink") +.addSchema("a bigint", "b bigint", "c bigint") +.consumedBeforeRestore(Row.of(6L, 7L, 8L)) +.consumedAfterRestore(Row.of(9L, 10L, 11L)) +.build()) +.runSql( +"insert into MySink" ++ " SELECT T.aid, T.bid, T.cid\n" ++ " FROM MyTable MATCH_RECOGNIZE (\n" ++ " ORDER BY proctime\n" ++ " MEASURES\n" ++ " `A\"`.id AS aid,\n" ++ " \u006C.id AS bid,\n" ++ " C.id AS cid\n" ++ " PATTERN (`A\"` \u006C C)\n" ++ " DEFINE\n" ++ " `A\"` AS name = 'a',\n" ++ " \u006C AS name = 'b',\n" ++ " C AS name = 'c'\n" ++ " ) AS T") +.build(); + +static final TableTestProgram COMPLEX_MATCH = +TableTestProgram.of("complex-match", "complex match recognize test") +.setupTableSource( +SourceTestStep.newBuilder("MyTable") +.addSchema( +"symbol string", +"tstamp bigint", +"price int", +"tax int", +"proctime as PROCTIME()") Review Comment: > Sure. Does MR do anything different depending on rowtime vs proctime? Yes, substantially. Even if it didn't the
[jira] [Reopened] (FLINK-32878) Add entropy to gcs path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-32878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reopened FLINK-32878: > Add entropy to gcs path for better scalability > -- > > Key: FLINK-32878 > URL: https://issues.apache.org/jira/browse/FLINK-32878 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.4, 1.16.2, 1.17.1 >Reporter: Jayadeep Jayaraman >Assignee: Cheena >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently GCS is used as a backend for both checkpointing and sink. In both > these cases the file names are sequential which causes hotspotting in GCS and > results in HTTP 5XX status code. > > As per [GCS best > practices]([https://cloud.google.com/storage/docs/request-rate)] it is > advisable to spread out the object creation with random names close to the > beginning of the file name. > > There is similar work done already for S3 as part of FLINK-9061 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
1996fanrui commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412210561 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); Review Comment: If I understand correct. The root cause is the logic related to `boolean specChanged` in `AbstractFlinkResourceReconciler#reconcile`. The `specChanged` will be true, when the ordering of the parallelism overrides is changed. And operator will scale job. Right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33638][table] Support variable-length data generation for variable-length data types [flink]
liyubin117 commented on PR #23810: URL: https://github.com/apache/flink/pull/23810#issuecomment-1836237987 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32815][table] Add HASHCODE function. [flink]
MartijnVisser commented on PR #23184: URL: https://github.com/apache/flink/pull/23184#issuecomment-1836233132 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
gyfora commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412198646 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); + +// Check that the overrides actually changed and not merely the String representation +var flinkConfig = ctx.getResource().getSpec().getFlinkConfiguration(); +var newOverrides = flinkConfig.get(PipelineOptions.PARALLELISM_OVERRIDES.key()); Review Comment: In general due to possible deprecated and other keys in the ConfigOption -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32815][table] Add HASHCODE function. [flink]
MartijnVisser closed pull request #23184: [BP-1.17][FLINK-32815][table] Add HASHCODE function. URL: https://github.com/apache/flink/pull/23184 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
1996fanrui commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412192687 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); + +// Check that the overrides actually changed and not merely the String representation +var flinkConfig = ctx.getResource().getSpec().getFlinkConfiguration(); +var newOverrides = flinkConfig.get(PipelineOptions.PARALLELISM_OVERRIDES.key()); +if (existingOverrides != null && newOverrides != null) { Review Comment: ```suggestion if (existingOverrides != null && newOverrides != null && !existingOverrides.equals(newOverrides)) { ``` When `existingOverrides.equals(newOverrides)`, we don't need to execute this logic. So we can add one condition here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP1.17][FLINK-32720][table] Add built-in GENERATE_SERIES function [flink]
MartijnVisser commented on PR #23190: URL: https://github.com/apache/flink/pull/23190#issuecomment-1836227462 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32565][table] Support Cast From NUMBER to BYTES [flink]
MartijnVisser commented on PR #23189: URL: https://github.com/apache/flink/pull/23189#issuecomment-1836227280 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32264][table] Add-built-in FIELD-function. [flink]
MartijnVisser closed pull request #23176: [BP-1.17][FLINK-32264][table] Add-built-in FIELD-function. URL: https://github.com/apache/flink/pull/23176 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32564][table] Support Cast From BYTES to NUMBER [flink]
MartijnVisser commented on PR #23188: URL: https://github.com/apache/flink/pull/23188#issuecomment-1836227209 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32264][table] Add-built-in FIELD-function. [flink]
MartijnVisser commented on PR #23176: URL: https://github.com/apache/flink/pull/23176#issuecomment-1836226643 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32263][table]Add-ELT-function [flink]
MartijnVisser commented on PR #23175: URL: https://github.com/apache/flink/pull/23175#issuecomment-1836226568 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32263][table]Add-ELT-function [flink]
MartijnVisser closed pull request #23175: [BP-1.17][FLINK-32263][table]Add-ELT-function URL: https://github.com/apache/flink/pull/23175 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-31664][table]Add ARRAY_INTERSECT function [flink]
MartijnVisser commented on PR #23186: URL: https://github.com/apache/flink/pull/23186#issuecomment-1836226201 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP1.17][FLINK-32720][table] Add built-in GENERATE_SERIES function [flink]
MartijnVisser closed pull request #23190: [BP1.17][FLINK-32720][table] Add built-in GENERATE_SERIES function URL: https://github.com/apache/flink/pull/23190 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32706][table] Add built-in SPLIT_STRING function [flink]
MartijnVisser closed pull request #23177: [BP-1.17][FLINK-32706][table] Add built-in SPLIT_STRING function URL: https://github.com/apache/flink/pull/23177 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32706][table] Add built-in SPLIT_STRING function [flink]
MartijnVisser commented on PR #23177: URL: https://github.com/apache/flink/pull/23177#issuecomment-1836227367 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32565][table] Support Cast From NUMBER to BYTES [flink]
MartijnVisser closed pull request #23189: [BP-1.17][FLINK-32565][table] Support Cast From NUMBER to BYTES URL: https://github.com/apache/flink/pull/23189 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32564][table] Support Cast From BYTES to NUMBER [flink]
MartijnVisser closed pull request #23188: [BP-1.17][FLINK-32564][table] Support Cast From BYTES to NUMBER URL: https://github.com/apache/flink/pull/23188 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32256][table] Add built-in ARRAY_ MIN function. [flink]
MartijnVisser commented on PR #23151: URL: https://github.com/apache/flink/pull/23151#issuecomment-1836225820 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32256][table] Add built-in ARRAY_ MIN function. [flink]
MartijnVisser closed pull request #23151: [BP-1.17][FLINK-32256][table] Add built-in ARRAY_ MIN function. URL: https://github.com/apache/flink/pull/23151 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17]historgram [flink]
MartijnVisser commented on PR #23194: URL: https://github.com/apache/flink/pull/23194#issuecomment-1836225515 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32261][table] Add built-in MAP_UNION function. [flink]
MartijnVisser commented on PR #23144: URL: https://github.com/apache/flink/pull/23144#issuecomment-1836226481 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32261][table] Add built-in MAP_UNION function. [flink]
MartijnVisser closed pull request #23144: [BP-1.17][FLINK-32261][table] Add built-in MAP_UNION function. URL: https://github.com/apache/flink/pull/23144 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-31664][table]Add ARRAY_INTERSECT function [flink]
MartijnVisser closed pull request #23186: [BP-1.17][FLINK-31664][table]Add ARRAY_INTERSECT function URL: https://github.com/apache/flink/pull/23186 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
MartijnVisser commented on PR #23185: URL: https://github.com/apache/flink/pull/23185#issuecomment-1836226100 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-31663][table] Add-ARRAY_EXCEPT-function. [flink]
MartijnVisser closed pull request #23185: [BP-1.17][FLINK-31663][table] Add-ARRAY_EXCEPT-function. URL: https://github.com/apache/flink/pull/23185 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
MartijnVisser commented on PR #23158: URL: https://github.com/apache/flink/pull/23158#issuecomment-1836225967 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]
MartijnVisser closed pull request #23158: [BP-1.17] [FLINK-26948][table] Add-ARRAY_SORT-function. URL: https://github.com/apache/flink/pull/23158 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32257][table] Add built-in ARRAY_MAX function. [flink]
MartijnVisser commented on PR #23149: URL: https://github.com/apache/flink/pull/23149#issuecomment-1836225680 Closing this PR since we don't backport new features into existing releases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17]historgram [flink]
MartijnVisser closed pull request #23194: [BP-1.17]historgram URL: https://github.com/apache/flink/pull/23194 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
mxm commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412183304 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); Review Comment: This call to scale can still lead to a spec change, even with the code below. It just prevents spec changes related to a non-deterministic ordering of the parallelism overrides, e.g. `a:1,b2` and `b:2,a:1`. I think it is a good idea to add documentation on how the reconciliation loop works, but it doesn't feel directly related to the changes here. There is a comment in line 211 which states that we avoid changing the spec. I can try to expand a little more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32257][table] Add built-in ARRAY_MAX function. [flink]
MartijnVisser commented on PR #23149: URL: https://github.com/apache/flink/pull/23149#issuecomment-1836221184 Closing since we don't backport new functions into existing releases, only into new ones -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-32257][table] Add built-in ARRAY_MAX function. [flink]
MartijnVisser closed pull request #23149: [BP-1.17][FLINK-32257][table] Add built-in ARRAY_MAX function. URL: https://github.com/apache/flink/pull/23149 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-31665] [table] Add ARRAY_CONCAT function [flink]
MartijnVisser commented on PR #23145: URL: https://github.com/apache/flink/pull/23145#issuecomment-1836220874 Closing since we don't backport new functions into existing releases, only into new ones -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-31665] [table] Add ARRAY_CONCAT function [flink]
MartijnVisser closed pull request #23145: [BP-1.17][FLINK-31665] [table] Add ARRAY_CONCAT function URL: https://github.com/apache/flink/pull/23145 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
mxm commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412176016 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); + +// Check that the overrides actually changed and not merely the String representation +var flinkConfig = ctx.getResource().getSpec().getFlinkConfiguration(); +var newOverrides = flinkConfig.get(PipelineOptions.PARALLELISM_OVERRIDES.key()); Review Comment: I'm curious, why is it not safe? ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -137,7 +141,13 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec(); SPEC currentDeploySpec = cr.getSpec(); -applyAutoscaler(ctx); +applyAutoscaler( +ctx, +lastReconciledSpec != null Review Comment: Sure, let me look into that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33667] Implement restore tests for MatchRecognize node [flink]
jnh5y commented on code in PR #23821: URL: https://github.com/apache/flink/pull/23821#discussion_r1412167278 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeTestPrograms.java: ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecMatch}. */ +public class MatchRecognizeTestPrograms { +static final Row[] SIMPLE_DATA = { +Row.of(1L, "a"), +Row.of(2L, "z"), +Row.of(3L, "b"), +Row.of(4L, "c"), +Row.of(5L, "d"), +Row.of(6L, "a"), +Row.of(7L, "b"), +Row.of(8L, "c"), +Row.of(9L, "a"), +Row.of(10L, "b") +}; + +static final Row[] SIMPLE_DATA2 = {Row.of(11L, "c")}; + +static final Row[] COMPLEX_DATA = { +Row.of("ACME", 1L, 19, 1), +Row.of("BETA", 2L, 18, 1), +Row.of("ACME", 3L, 17, 2), +Row.of("ACME", 4L, 13, 3), +Row.of("BETA", 5L, 16, 2), +Row.of("ACME", 6L, 20, 4) +}; + +static final Row[] COMPLEX_DATA2 = {Row.of("BETA", 7L, 22, 4)}; + +static final TableTestProgram SIMPLE_MATCH = +TableTestProgram.of("simple-match", "simple match recognize test") +.setupTableSource( +SourceTestStep.newBuilder("MyTable") +.addSchema( +"id bigint", "name varchar", "proctime as PROCTIME()") +.producedBeforeRestore(SIMPLE_DATA) +.producedAfterRestore(SIMPLE_DATA2) +.build()) +.setupTableSink( +SinkTestStep.newBuilder("MySink") +.addSchema("a bigint", "b bigint", "c bigint") +.consumedBeforeRestore(Row.of(6L, 7L, 8L)) +.consumedAfterRestore(Row.of(9L, 10L, 11L)) +.build()) +.runSql( +"insert into MySink" ++ " SELECT T.aid, T.bid, T.cid\n" ++ " FROM MyTable MATCH_RECOGNIZE (\n" ++ " ORDER BY proctime\n" ++ " MEASURES\n" ++ " `A\"`.id AS aid,\n" ++ " \u006C.id AS bid,\n" ++ " C.id AS cid\n" ++ " PATTERN (`A\"` \u006C C)\n" ++ " DEFINE\n" ++ " `A\"` AS name = 'a',\n" ++ " \u006C AS name = 'b',\n" ++ " C AS name = 'c'\n" ++ " ) AS T") +.build(); + +static final TableTestProgram COMPLEX_MATCH = +TableTestProgram.of("complex-match", "complex match recognize test") +.setupTableSource( +SourceTestStep.newBuilder("MyTable") +.addSchema( +"symbol string", +"tstamp bigint", +"price int", +"tax int", +"proctime as PROCTIME()") Review Comment: Sure. Does MR do anything different depending on rowtime vs proctime? Also, just to make sure, what do you mean be
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
gyfora commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412164693 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -137,7 +141,13 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec(); SPEC currentDeploySpec = cr.getSpec(); -applyAutoscaler(ctx); +applyAutoscaler( +ctx, +lastReconciledSpec != null Review Comment: I think we should extract the configs through the ctx and avoid the extra logic and parameter passing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
gyfora commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412161793 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); + +// Check that the overrides actually changed and not merely the String representation +var flinkConfig = ctx.getResource().getSpec().getFlinkConfiguration(); +var newOverrides = flinkConfig.get(PipelineOptions.PARALLELISM_OVERRIDES.key()); Review Comment: It’s not safe to access configs by key directly . We can use the ctx getDeployConfig and getObserveConfig to get the configs and then we can even avoid passing the extra parameter to the method ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); + +// Check that the overrides actually changed and not merely the String representation +var flinkConfig = ctx.getResource().getSpec().getFlinkConfiguration(); +var newOverrides = flinkConfig.get(PipelineOptions.PARALLELISM_OVERRIDES.key()); Review Comment: It’s not safe to access configs by key directly . We can use the ctx getDeployConfig and getObserveConfig to get the configs and then we can even avoid passing the extra parameter to the method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33638][table] Support variable-length data generation for variable-length data types [flink]
liyubin117 commented on code in PR #23810: URL: https://github.com/apache/flink/pull/23810#discussion_r1412149258 ## docs/content.zh/docs/connectors/table/datagen.md: ## @@ -29,51 +29,202 @@ under the License. {{< label "Scan Source: 有界" >}} {{< label "Scan Source: 无界" >}} -DataGen 连接器允许按数据生成规则进行读取。 +DataGen 连接器允许基于内存生成数据来创建表。 +在本地开发时,若不访问外部系统(如 Kafka),这会非常有用。 +可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)灵活地生成记录。 -DataGen 连接器可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)。 -这使您可以灵活地生成记录。 +DataGen 连接器是内置的,不需要额外的依赖项。 -DataGen 连接器是内置的。 +用法 +- -注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。 +默认情况下,DataGen 表将创建无限数量的行,每列都有一个随机值。 +对于 char、varchar、binary、varbinary、string、array、map 和 multiset 类型,可以指定长度。 +还可以指定总行数,从而生成有界表。 -怎么创建一个 DataGen 的表 - - -表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。 - -每个列,都有两种生成数据的方法: - -- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、binary、varbinary, string (类型)可以指定长度。它是无界的生成器。 +还支持序列生成器,您可以指定序列的起始和结束值。 +如果表中有任一列是序列类型,则该表将是有界的,并在第一个序列完成时结束。 -- 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。 +时间类型字段对应的值始终是本地计算机当前系统时间。 ```sql -CREATE TABLE datagen ( - f_sequence INT, - f_random INT, - f_random_str STRING, - ts AS localtimestamp, - WATERMARK FOR ts AS ts +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3) ) WITH ( - 'connector' = 'datagen', + 'connector' = 'datagen' +) +``` - -- optional options -- +DataGen 连接器通常与 ``LIKE`` 子句结合使用,以模拟物理表。 - 'rows-per-second'='5', +```sql +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3) +) WITH (...) - 'fields.f_sequence.kind'='sequence', - 'fields.f_sequence.start'='1', - 'fields.f_sequence.end'='1000', +-- create a bounded mock table +CREATE TEMPORARY TABLE GenOrders +WITH ( +'connector' = 'datagen', +'number-of-rows' = '10' +) +LIKE Orders (EXCLUDING ALL) +``` - 'fields.f_random.min'='1', - 'fields.f_random.max'='1000', +此外,对于可变长度类型(varchar、string、varbinary 和 bytes),您可以指定是否生成可变长度的数据。 - 'fields.f_random_str.length'='10' +```sql +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3), +seller VARCHAR(150) +) WITH ( + 'connector' = 'datagen', + 'fields.seller.var-len' = 'true' ) ``` +字段类型 +- + + + + +Type +Supported Generators +Notes + + + + +BOOLEAN +random + + + +CHAR +random / sequence + + + +VARCHAR +random / sequence + + + +BINARY +random / sequence + + + +VARBINARY +random / sequence + + + +STRING +random / sequence + + + +DECIMAL +random / sequence + + + +TINYINT +random / sequence + + + +SMALLINT +random / sequence + + + +INT +random / sequence + + + +BIGINT +random / sequence + + + +FLOAT +random / sequence + + + +DOUBLE +random / sequence + + + +DATE +random +总是解析为本地计算机的当前日期。 + + +TIME +random +总是解析为本地计算机的当前日期。 Review Comment: done ## docs/content.zh/docs/connectors/table/datagen.md: ## @@ -29,51 +29,202 @@ under the License. {{< label "Scan Source: 有界" >}} {{< label "Scan Source: 无界" >}} -DataGen 连接器允许按数据生成规则进行读取。 +DataGen 连接器允许基于内存生成数据来创建表。 +在本地开发时,若不访问外部系统(如 Kafka),这会非常有用。 +可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)灵活地生成记录。 -DataGen 连接器可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)。 -这使您可以灵活地生成记录。 +DataGen 连接器是内置的,不需要额外的依赖项。 -DataGen 连接器是内置的。 +用法 +- -注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。 +默认情况下,DataGen 表将创建无限数量的行,每列都有一个随机值。 +对于 char、varchar、binary、varbinary、string、array、map 和 multiset 类型,可以指定长度。 +还可以指定总行数,从而生成有界表。 -怎么创建一个 DataGen 的表 - - -表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。 - -每个列,都有两种生成数据的方法: - -- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、binary、varbinary, string (类型)可以指定长度。它是无界的生成器。 +还支持序列生成器,您可以指定序列的起始和结束值。 +如果表中有任一列是序列类型,则该表将是有界的,并在第一个序列完成时结束。 -- 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。 +时间类型字段对应的值始终是本地计算机当前系统时间。 ```sql -CREATE TABLE datagen ( - f_sequence INT, - f_random INT, - f_random_str STRING, - ts AS
Re: [PR] [FLINK-33638][table] Support variable-length data generation for variable-length data types [flink]
liyubin117 commented on code in PR #23810: URL: https://github.com/apache/flink/pull/23810#discussion_r1412147094 ## docs/content.zh/docs/connectors/table/datagen.md: ## @@ -29,51 +29,202 @@ under the License. {{< label "Scan Source: 有界" >}} {{< label "Scan Source: 无界" >}} -DataGen 连接器允许按数据生成规则进行读取。 +DataGen 连接器允许基于内存生成数据来创建表。 +在本地开发时,若不访问外部系统(如 Kafka),这会非常有用。 +可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)灵活地生成记录。 -DataGen 连接器可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)。 -这使您可以灵活地生成记录。 +DataGen 连接器是内置的,不需要额外的依赖项。 -DataGen 连接器是内置的。 +用法 +- -注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。 +默认情况下,DataGen 表将创建无限数量的行,每列都有一个随机值。 +对于 char、varchar、binary、varbinary、string、array、map 和 multiset 类型,可以指定长度。 +还可以指定总行数,从而生成有界表。 -怎么创建一个 DataGen 的表 - - -表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。 - -每个列,都有两种生成数据的方法: - -- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、binary、varbinary, string (类型)可以指定长度。它是无界的生成器。 +还支持序列生成器,您可以指定序列的起始和结束值。 +如果表中有任一列是序列类型,则该表将是有界的,并在第一个序列完成时结束。 -- 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。 +时间类型字段对应的值始终是本地计算机当前系统时间。 ```sql -CREATE TABLE datagen ( - f_sequence INT, - f_random INT, - f_random_str STRING, - ts AS localtimestamp, - WATERMARK FOR ts AS ts +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3) ) WITH ( - 'connector' = 'datagen', + 'connector' = 'datagen' +) +``` - -- optional options -- +DataGen 连接器通常与 ``LIKE`` 子句结合使用,以模拟物理表。 - 'rows-per-second'='5', +```sql +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3) +) WITH (...) - 'fields.f_sequence.kind'='sequence', - 'fields.f_sequence.start'='1', - 'fields.f_sequence.end'='1000', +-- create a bounded mock table +CREATE TEMPORARY TABLE GenOrders +WITH ( +'connector' = 'datagen', +'number-of-rows' = '10' +) +LIKE Orders (EXCLUDING ALL) +``` - 'fields.f_random.min'='1', - 'fields.f_random.max'='1000', +此外,对于可变长度类型(varchar、string、varbinary 和 bytes),您可以指定是否生成可变长度的数据。 - 'fields.f_random_str.length'='10' +```sql +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3), +seller VARCHAR(150) +) WITH ( + 'connector' = 'datagen', + 'fields.seller.var-len' = 'true' ) ``` +字段类型 +- + + + + +Type +Supported Generators +Notes + + + + +BOOLEAN +random + + + +CHAR +random / sequence + + + +VARCHAR +random / sequence + + + +BINARY +random / sequence + + + +VARBINARY +random / sequence + + + +STRING +random / sequence + + + +DECIMAL +random / sequence + + + +TINYINT +random / sequence + + + +SMALLINT +random / sequence + + + +INT +random / sequence + + + +BIGINT +random / sequence + + + +FLOAT +random / sequence + + + +DOUBLE +random / sequence + + + +DATE +random +总是解析为本地计算机的当前日期。 + + +TIME +random +总是解析为本地计算机的当前日期。 + + +TIMESTAMP +random + +解析相对于本地计算机的当前时间戳向过去偏移的时间戳。偏移的最大值可以通过 'max-past' 选项指定。 + + + +TIMESTAMP_LTZ +random + +解析相对于本地计算机的当前时间戳向过去偏移的时间戳。偏移的最大值可以通过 'max-past' 选项指定。 + + + +INTERVAL YEAR TO MONTH +random + + + +INTERVAL DAY TO MONTH +random + + + +ROW +random +生成具有随机字段数据的行。 + + +ARRAY +random +生成具有随机元素的数组。 + + +MAP +random +生成具有随机元素的映射。 Review Comment: reasonable indeed ## docs/content.zh/docs/connectors/table/datagen.md: ## @@ -29,51 +29,202 @@ under the License. {{< label "Scan Source: 有界" >}} {{< label "Scan Source: 无界" >}} -DataGen 连接器允许按数据生成规则进行读取。 +DataGen 连接器允许基于内存生成数据来创建表。 +在本地开发时,若不访问外部系统(如 Kafka),这会非常有用。 +可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)灵活地生成记录。 -DataGen
[jira] [Closed] (FLINK-33702) Add IncrementalDelayRetryStrategy in AsyncRetryStrategies
[ https://issues.apache.org/jira/browse/FLINK-33702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiangyu feng closed FLINK-33702. Resolution: Not A Problem `AsyncRetryStrategy` is designed for AsyncWaitOperator to use. It may not suitable for `CollectResultFetcher` to use. > Add IncrementalDelayRetryStrategy in AsyncRetryStrategies > -- > > Key: FLINK-33702 > URL: https://issues.apache.org/jira/browse/FLINK-33702 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: xiangyu feng >Priority: Major > Labels: pull-request-available > > AsyncRetryStrategies now supports NoRetryStrategy, FixedDelayRetryStrategy > and ExponentialBackoffDelayRetryStrategy. > In certain scenarios, we also need IncrementalDelayRetryStrategy to reduce > the retry count and perform the action more timely. > IncrementalDelayRetryStrategy will increase the retry delay at a fixed rate > for each attempt. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33638][table] Support variable-length data generation for variable-length data types [flink]
lincoln-lil commented on code in PR #23810: URL: https://github.com/apache/flink/pull/23810#discussion_r1412107586 ## docs/content.zh/docs/connectors/table/datagen.md: ## @@ -29,51 +29,202 @@ under the License. {{< label "Scan Source: 有界" >}} {{< label "Scan Source: 无界" >}} -DataGen 连接器允许按数据生成规则进行读取。 +DataGen 连接器允许基于内存生成数据来创建表。 +在本地开发时,若不访问外部系统(如 Kafka),这会非常有用。 +可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)灵活地生成记录。 -DataGen 连接器可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)。 -这使您可以灵活地生成记录。 +DataGen 连接器是内置的,不需要额外的依赖项。 -DataGen 连接器是内置的。 +用法 +- -注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。 +默认情况下,DataGen 表将创建无限数量的行,每列都有一个随机值。 +对于 char、varchar、binary、varbinary、string、array、map 和 multiset 类型,可以指定长度。 +还可以指定总行数,从而生成有界表。 -怎么创建一个 DataGen 的表 - - -表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。 - -每个列,都有两种生成数据的方法: - -- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、binary、varbinary, string (类型)可以指定长度。它是无界的生成器。 +还支持序列生成器,您可以指定序列的起始和结束值。 +如果表中有任一列是序列类型,则该表将是有界的,并在第一个序列完成时结束。 -- 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。 +时间类型字段对应的值始终是本地计算机当前系统时间。 ```sql -CREATE TABLE datagen ( - f_sequence INT, - f_random INT, - f_random_str STRING, - ts AS localtimestamp, - WATERMARK FOR ts AS ts +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3) ) WITH ( - 'connector' = 'datagen', + 'connector' = 'datagen' +) +``` - -- optional options -- +DataGen 连接器通常与 ``LIKE`` 子句结合使用,以模拟物理表。 - 'rows-per-second'='5', +```sql +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3) +) WITH (...) - 'fields.f_sequence.kind'='sequence', - 'fields.f_sequence.start'='1', - 'fields.f_sequence.end'='1000', +-- create a bounded mock table +CREATE TEMPORARY TABLE GenOrders +WITH ( +'connector' = 'datagen', +'number-of-rows' = '10' +) +LIKE Orders (EXCLUDING ALL) +``` - 'fields.f_random.min'='1', - 'fields.f_random.max'='1000', +此外,对于可变长度类型(varchar、string、varbinary 和 bytes),您可以指定是否生成可变长度的数据。 - 'fields.f_random_str.length'='10' +```sql +CREATE TABLE Orders ( +order_number BIGINT, +priceDECIMAL(32,2), +buyerROW, +order_time TIMESTAMP(3), +seller VARCHAR(150) +) WITH ( + 'connector' = 'datagen', + 'fields.seller.var-len' = 'true' ) ``` +字段类型 +- + + + + +Type +Supported Generators +Notes + + + + +BOOLEAN +random + + + +CHAR +random / sequence + + + +VARCHAR +random / sequence + + + +BINARY +random / sequence + + + +VARBINARY +random / sequence + + + +STRING +random / sequence + + + +DECIMAL +random / sequence + + + +TINYINT +random / sequence + + + +SMALLINT +random / sequence + + + +INT +random / sequence + + + +BIGINT +random / sequence + + + +FLOAT +random / sequence + + + +DOUBLE +random / sequence + + + +DATE +random +总是解析为本地计算机的当前日期。 + + +TIME +random +总是解析为本地计算机的当前日期。 Review Comment: 总是解析为本地机器的当前时间。 ## docs/content.zh/docs/connectors/table/datagen.md: ## @@ -29,51 +29,202 @@ under the License. {{< label "Scan Source: 有界" >}} {{< label "Scan Source: 无界" >}} -DataGen 连接器允许按数据生成规则进行读取。 +DataGen 连接器允许基于内存生成数据来创建表。 +在本地开发时,若不访问外部系统(如 Kafka),这会非常有用。 +可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)灵活地生成记录。 -DataGen 连接器可以使用[计算列语法]({{< ref "docs/dev/table/sql/create" >}}#create-table)。 -这使您可以灵活地生成记录。 +DataGen 连接器是内置的,不需要额外的依赖项。 -DataGen 连接器是内置的。 +用法 +- -注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。 +默认情况下,DataGen 表将创建无限数量的行,每列都有一个随机值。 +对于 char、varchar、binary、varbinary、string、array、map 和 multiset 类型,可以指定长度。 +还可以指定总行数,从而生成有界表。 -怎么创建一个 DataGen 的表 - - -表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。 - -每个列,都有两种生成数据的方法: - -- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、binary、varbinary, string (类型)可以指定长度。它是无界的生成器。 +还支持序列生成器,您可以指定序列的起始和结束值。 +如果表中有任一列是序列类型,则该表将是有界的,并在第一个序列完成时结束。 -- 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。 +时间类型字段对应的值始终是本地计算机当前系统时间。 ```sql -CREATE TABLE datagen ( - f_sequence INT, - f_random INT, - f_random_str STRING, - ts
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
afedulov commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412106299 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); Review Comment: Maybe add a comment to the section below explaining why this call to `scale` does not lead to the unnecessary update if we only patch the new config after it. I personally do not directly understand why this is the case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
afedulov commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412106299 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); Review Comment: Maybe add a comment below explaining why this call to `scale` does not lead to the unnecessary update if we only patch the new config after it. I personally do not directly understand why this is the case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]
afedulov commented on code in PR #721: URL: https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1412106299 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java: ## @@ -180,13 +190,28 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } -private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { +private void applyAutoscaler(FlinkResourceContext ctx, @Nullable String existingOverrides) +throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); boolean autoscalerEnabled = ctx.getResource().getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); Review Comment: Maybe add a comment explaining why this call to `scale` does not lead to the unnecessary update if we only patch the new config after it. I personally do not directly understand why this is the case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32877]add HTTP options to gcs-cloud-storage client [flink]
MartijnVisser commented on PR #23226: URL: https://github.com/apache/flink/pull/23226#issuecomment-1836116534 Also @singhravidutt what's your Jira user name, so I can assign this ticket to you? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32877]add HTTP options to gcs-cloud-storage client [flink]
MartijnVisser commented on PR #23226: URL: https://github.com/apache/flink/pull/23226#issuecomment-1836115444 @singhravidutt Could you please squash your commits and rebase? Your commit message should start `[FLINK-32877][Filesystem]` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32878) Add entropy to gcs path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-32878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-32878. -- Fix Version/s: 1.19.0 Resolution: Fixed Fixed in apache/flink:master dc1db12137ecad921ae90969d7bfbf1ee7d3d2ef > Add entropy to gcs path for better scalability > -- > > Key: FLINK-32878 > URL: https://issues.apache.org/jira/browse/FLINK-32878 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.4, 1.16.2, 1.17.1 >Reporter: Jayadeep Jayaraman >Assignee: Cheena >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently GCS is used as a backend for both checkpointing and sink. In both > these cases the file names are sequential which causes hotspotting in GCS and > results in HTTP 5XX status code. > > As per [GCS best > practices]([https://cloud.google.com/storage/docs/request-rate)] it is > advisable to spread out the object creation with random names close to the > beginning of the file name. > > There is similar work done already for S3 as part of FLINK-9061 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32878) Add entropy to gcs path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-32878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-32878: -- Assignee: Cheena (was: Jayadeep Jayaraman) > Add entropy to gcs path for better scalability > -- > > Key: FLINK-32878 > URL: https://issues.apache.org/jira/browse/FLINK-32878 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.4, 1.16.2, 1.17.1 >Reporter: Jayadeep Jayaraman >Assignee: Cheena >Priority: Major > Labels: pull-request-available > > Currently GCS is used as a backend for both checkpointing and sink. In both > these cases the file names are sequential which causes hotspotting in GCS and > results in HTTP 5XX status code. > > As per [GCS best > practices]([https://cloud.google.com/storage/docs/request-rate)] it is > advisable to spread out the object creation with random names close to the > beginning of the file name. > > There is similar work done already for S3 as part of FLINK-9061 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32878] [FileSystems] Add entropy in temporary object name in flink gcs filesystem [flink]
MartijnVisser merged PR #23729: URL: https://github.com/apache/flink/pull/23729 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-32878) Add entropy to gcs path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-32878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-32878: -- Assignee: Jayadeep Jayaraman > Add entropy to gcs path for better scalability > -- > > Key: FLINK-32878 > URL: https://issues.apache.org/jira/browse/FLINK-32878 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.4, 1.16.2, 1.17.1 >Reporter: Jayadeep Jayaraman >Assignee: Jayadeep Jayaraman >Priority: Major > Labels: pull-request-available > > Currently GCS is used as a backend for both checkpointing and sink. In both > these cases the file names are sequential which causes hotspotting in GCS and > results in HTTP 5XX status code. > > As per [GCS best > practices]([https://cloud.google.com/storage/docs/request-rate)] it is > advisable to spread out the object creation with random names close to the > beginning of the file name. > > There is similar work done already for S3 as part of FLINK-9061 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33508) Support for wildcard paths in Flink History Server for multi cluster environment
[ https://issues.apache.org/jira/browse/FLINK-33508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser reassigned FLINK-33508: -- Assignee: Jayadeep Jayaraman > Support for wildcard paths in Flink History Server for multi cluster > environment > > > Key: FLINK-33508 > URL: https://issues.apache.org/jira/browse/FLINK-33508 > Project: Flink > Issue Type: Improvement >Reporter: Jayadeep Jayaraman >Assignee: Jayadeep Jayaraman >Priority: Major > Labels: pull-request-available > > In Cloud users typically create multiple clusters which are ephemeral and > want a single history server to look at historical jobs. > To implement this history server needs to support wildcard paths and this > change is to support such wildcard paths -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33667] Implement restore tests for MatchRecognize node [flink]
dawidwys commented on code in PR #23821: URL: https://github.com/apache/flink/pull/23821#discussion_r1412092205 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeTestPrograms.java: ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecMatch}. */ +public class MatchRecognizeTestPrograms { +static final Row[] SIMPLE_DATA = { +Row.of(1L, "a"), +Row.of(2L, "z"), +Row.of(3L, "b"), +Row.of(4L, "c"), +Row.of(5L, "d"), +Row.of(6L, "a"), +Row.of(7L, "b"), +Row.of(8L, "c"), +Row.of(9L, "a"), +Row.of(10L, "b") +}; + +static final Row[] SIMPLE_DATA2 = {Row.of(11L, "c")}; + +static final Row[] COMPLEX_DATA = { +Row.of("ACME", 1L, 19, 1), +Row.of("BETA", 2L, 18, 1), +Row.of("ACME", 3L, 17, 2), +Row.of("ACME", 4L, 13, 3), +Row.of("BETA", 5L, 16, 2), +Row.of("ACME", 6L, 20, 4) +}; + +static final Row[] COMPLEX_DATA2 = {Row.of("BETA", 7L, 22, 4)}; + +static final TableTestProgram SIMPLE_MATCH = +TableTestProgram.of("simple-match", "simple match recognize test") +.setupTableSource( +SourceTestStep.newBuilder("MyTable") +.addSchema( +"id bigint", "name varchar", "proctime as PROCTIME()") +.producedBeforeRestore(SIMPLE_DATA) +.producedAfterRestore(SIMPLE_DATA2) +.build()) +.setupTableSink( +SinkTestStep.newBuilder("MySink") +.addSchema("a bigint", "b bigint", "c bigint") +.consumedBeforeRestore(Row.of(6L, 7L, 8L)) +.consumedAfterRestore(Row.of(9L, 10L, 11L)) +.build()) +.runSql( +"insert into MySink" ++ " SELECT T.aid, T.bid, T.cid\n" ++ " FROM MyTable MATCH_RECOGNIZE (\n" ++ " ORDER BY proctime\n" ++ " MEASURES\n" ++ " `A\"`.id AS aid,\n" ++ " \u006C.id AS bid,\n" ++ " C.id AS cid\n" ++ " PATTERN (`A\"` \u006C C)\n" ++ " DEFINE\n" ++ " `A\"` AS name = 'a',\n" ++ " \u006C AS name = 'b',\n" ++ " C AS name = 'c'\n" ++ " ) AS T") +.build(); + +static final TableTestProgram COMPLEX_MATCH = +TableTestProgram.of("complex-match", "complex match recognize test") +.setupTableSource( +SourceTestStep.newBuilder("MyTable") +.addSchema( +"symbol string", +"tstamp bigint", +"price int", +"tax int", +"proctime as PROCTIME()") Review Comment: Can we add tests covering: * rowtime * secondary sorting This will improve test plan coverage -- This
[jira] [Commented] (FLINK-33694) GCS filesystem does not respect gs.storage.root.url config option
[ https://issues.apache.org/jira/browse/FLINK-33694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792050#comment-17792050 ] Patrick Lucas commented on FLINK-33694: --- [~martijnvisser] my comment is perhaps not precise, in that "support has been added to configure credentials correctly" should be qualified with the comment from the docs about which credentials mechanisms are supported. But it is still true that other potentially-interesting options are not proxied through, such as setting the root URL. This change as written doesn't affect any credentials handling, only adding support for this one additional option. However, I could see an argument for implementing the behavior in {{org.apache.flink.fs.gs.utils.ConfigUtils}} as the credentials behavior is rather than in {{GSFileSystemOptions}} as I did to start with. > GCS filesystem does not respect gs.storage.root.url config option > - > > Key: FLINK-33694 > URL: https://issues.apache.org/jira/browse/FLINK-33694 > Project: Flink > Issue Type: Bug > Components: FileSystems >Affects Versions: 1.18.0, 1.17.2 >Reporter: Patrick Lucas >Priority: Major > Labels: gcs, pull-request-available > > The GCS FileSystem's RecoverableWriter implementation uses the GCS SDK > directly rather than going through Hadoop. While support has been added to > configure credentials correctly based on the standard Hadoop implementation > configuration, no other options are passed through to the underlying client. > Because this only affects the RecoverableWriter-related codepaths, it can > result in very surprising differing behavior whether the FileSystem is being > used as a source or a sink—while a {{{}gs://{}}}-URI FileSource may work > fine, a {{{}gs://{}}}-URI FileSink may not work at all. > We use [fake-gcs-server|https://github.com/fsouza/fake-gcs-server] in > testing, and so we override the Hadoop GCS FileSystem config option > {{{}gs.storage.root.url{}}}. However, because this option is not considered > when creating the GCS client for the RecoverableWriter codepath, in a > FileSink the GCS FileSystem attempts to write to the real GCS service rather > than fake-gcs-server. At the same time, a FileSource works as expected, > reading from fake-gcs-server. > The fix should be fairly straightforward, reading the {{gs.storage.root.url}} > config option from the Hadoop FileSystem config in > [{{GSFileSystemOptions}}|https://github.com/apache/flink/blob/release-1.18.0/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemOptions.java#L30] > and, if set, passing it to {{storageOptionsBuilder}} in > [{{GSFileSystemFactory}}|https://github.com/apache/flink/blob/release-1.18.0/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java]. > The only workaround for this is to build a custom flink-gs-fs-hadoop JAR with > a patch and use it as a plugin. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33710] Prevent NOOP spec updates from the autoscaler parallelism override map [flink-kubernetes-operator]
mxm commented on PR #720: URL: https://github.com/apache/flink-kubernetes-operator/pull/720#issuecomment-1836099166 No harm done @1996fanrui! Thanks for the quick review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Flink 31966] Flink Kubernetes operator lacks TLS support [flink-kubernetes-operator]
tagarr commented on PR #712: URL: https://github.com/apache/flink-kubernetes-operator/pull/712#issuecomment-1836094102 @gaborgsomogyi are my changes what you expected ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33433][rest] Introduce async-profiler to support profiling Job… [flink]
Myasuka commented on code in PR #23820: URL: https://github.com/apache/flink/pull/23820#discussion_r1412050762 ## flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ProfilingService.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import one.profiler.AsyncProfiler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** Create and keep profiling requests with rolling policy. */ +public class ProfilingService implements Closeable { Review Comment: Since `ProfilingService` is also used on JobManager, shall we move this class to somewhere like `org.apache.flink.runtime.util`? ## flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ProfilingService.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import one.profiler.AsyncProfiler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** Create and keep profiling requests with rolling policy. */ +public class ProfilingService implements Closeable { Review Comment: Moreover, we should add some tests for this service. At least including: 1. singleton instance 2. only running one profiling at the same time 3. rolling clean up local files ## flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ProfilingService.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache
[jira] [Commented] (FLINK-32028) Error handling for the new Elasticsearch sink
[ https://issues.apache.org/jira/browse/FLINK-32028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792024#comment-17792024 ] Peter Schulz commented on FLINK-32028: -- I gave it [a try|https://github.com/apache/flink-connector-elasticsearch/pull/83] to come up with a minimal invasive, backwards compatible approach. If was only about the error handling, you could stay in elasticsearch-land and decide to throw or not to throw based on {{BulkRequest}} and {{BulkResponse}}. However, we also needed metrics and this is where I had to bridge between flink-land and elasticsearch-land and allow the newly introduced `BulkRequestInterceptorFactory` to be aware of {{MetricGroup}}. This approach is somewhat tailored to our needs so I would highly appreciate feedback to make it generally applicable. > Error handling for the new Elasticsearch sink > - > > Key: FLINK-32028 > URL: https://issues.apache.org/jira/browse/FLINK-32028 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Affects Versions: 1.16.1 >Reporter: Tudor Plugaru >Priority: Major > Labels: pull-request-available > > The deprecated ElasticsearchSink supports setting an error handler via a > [public method > |https://github.com/apache/flink-connector-elasticsearch/blob/8f75d4e059c09b55cc3a44bab3e64330b1246d27/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticsearchSink.java#L216] > but the new sink, does not. > Ideally there would be a way to handle ES specific exceptions and be able to > skip items from being retried on ES indefinitely and not block entirely the > pipeline. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33305] Allow to customize arg line for connector's surefire plugin [flink-connector-shared-utils]
echauchot commented on code in PR #25: URL: https://github.com/apache/flink-connector-shared-utils/pull/25#discussion_r1412014261 ## pom.xml: ## @@ -558,6 +563,8 @@ under the License. US en ${project.basedir} + + ${surefire.module.config} Review Comment: please elaborate on why exposing the property -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33305] Allow to customize arg line for connector's surefire plugin [flink-connector-shared-utils]
echauchot commented on PR #25: URL: https://github.com/apache/flink-connector-shared-utils/pull/25#issuecomment-1835994166 >> @snuyanzin can you elaborate on the use cases you have in mind for systemPropertyVariable surefire.module.config ? > The use case is testing against jdk17/21 The issue with these jdks is that sometimes it is from one side required to add `add-opens/export` flags for tests like it is done e.g. for flink-core https://github.com/apache/flink/blob/a024f6329593bee1188080ca41808b8ac77b0660/flink-core/pom.xml#L44-L58 > > Similar it is required for connector's code. From another side it's better to keep existing surefire flags. + in case of jdk8 it makes sence to have `-XX:+IgnoreUnrecognizedVMOptions` to make it non failing because of all these `add-[opens|exports]` yes, it was clear already in the comment in the pom. I meant, can you elaborate on why this: ` ${surefire.module.config}` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32028) Error handling for the new Elasticsearch sink
[ https://issues.apache.org/jira/browse/FLINK-32028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32028: --- Labels: pull-request-available (was: ) > Error handling for the new Elasticsearch sink > - > > Key: FLINK-32028 > URL: https://issues.apache.org/jira/browse/FLINK-32028 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Affects Versions: 1.16.1 >Reporter: Tudor Plugaru >Priority: Major > Labels: pull-request-available > > The deprecated ElasticsearchSink supports setting an error handler via a > [public method > |https://github.com/apache/flink-connector-elasticsearch/blob/8f75d4e059c09b55cc3a44bab3e64330b1246d27/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/ElasticsearchSink.java#L216] > but the new sink, does not. > Ideally there would be a way to handle ES specific exceptions and be able to > skip items from being retried on ES indefinitely and not block entirely the > pipeline. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
boring-cyborg[bot] commented on PR #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1835980923 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]
schulzp opened a new pull request, #83: URL: https://github.com/apache/flink-connector-elasticsearch/pull/83 Extracted `BulkResponseInspector` interface to allow custom handling of (partially) failed bulk requests. If not overridden, default behaviour remains unchanged and partial failures are escalated. * fixes https://issues.apache.org/jira/browse/FLINK-32028 * allows custom metrics to be exposed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33725) MathUtils.isPowerOf2 does not cover the case of value=0
Jes Cok created FLINK-33725: --- Summary: MathUtils.isPowerOf2 does not cover the case of value=0 Key: FLINK-33725 URL: https://issues.apache.org/jira/browse/FLINK-33725 Project: Flink Issue Type: Improvement Components: API / Core Reporter: Jes Cok org.apache.flink.util.MathUtils.isPowerOf2, This static method does not cover the case of value=0. Should the document explain that value cannot be =0? Or could it be re implemented as the following code? public static boolean isPowerOf2(long value) { return value > 0 && (value & (value - 1)) == 0; } -- This message was sent by Atlassian Jira (v8.20.10#820010)