[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r764623110 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayesModelData.java ## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.naivebayes; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.src.reader.SimpleStreamFormat; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * The model data of {@link NaiveBayesModel}. + * + * This class also provides methods to convert model data between Table and Datastream, and + * classes to save/load model data. + */ +public class NaiveBayesModelData implements Serializable { +public final Map[][] theta; +public final double[] piArray; +public final int[] labels; + +public NaiveBayesModelData(Map[][] theta, double[] piArray, int[] labels) { +this.theta = theta; +this.piArray = piArray; +this.labels = labels; +} + +/** Converts the provided modelData Datastream into corresponding Table. */ +public static Table getModelDataTable(DataStream stream) { +StreamTableEnvironment tEnv = + StreamTableEnvironment.create(stream.getExecutionEnvironment()); +return tEnv.fromDataStream(stream); +} + +/** Converts the provided modelData Table into corresponding DataStream. */ +public static DataStream getModelDataStream(Table table) { +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) table).getTableEnvironment(); +return tEnv.toDataStream(table) +.map( +(MapFunction) +row -> (NaiveBayesModelData) row.getField("f0")); +} + +/** Encoder for the {@link NaiveBayesModelData}. */ +public static class ModelDataEncoder implements Encoder { +@Override +public void encode(NaiveBayesModelData modelData, OutputStream outputStream) { +Output output = new Output(outputStream); + +output.writeInt(modelData.labels.length); Review comment: OK. I'll make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #16582: [FLINK-21504][checkpoint] Introduce notification of subsumed checkpoint
flinkbot edited a comment on pull request #16582: URL: https://github.com/apache/flink/pull/16582#issuecomment-885598535 ## CI report: * 4575a58b2365b9ab070606108ef6ba1b4507dc24 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27764) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the num
flinkbot edited a comment on pull request #17692: URL: https://github.com/apache/flink/pull/17692#issuecomment-961748638 ## CI report: * 5378879134e1c08ada5239c650d2ce77cff0f3e3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27776) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xiangqiao123 commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.
xiangqiao123 commented on a change in pull request #17952: URL: https://github.com/apache/flink/pull/17952#discussion_r764615819 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Default implementation of {@link VertexParallelismDecider}. */ +public class DefaultVertexParallelismDecider implements VertexParallelismDecider { + +private static final Logger LOG = +LoggerFactory.getLogger(DefaultVertexParallelismDecider.class); + +/** + * The cap ratio of broadcast bytes to data volume per task. The cap ratio is 0.5 currently + * because we usually expect the broadcast dataset to be smaller than non-broadcast. We can make + * it configurable later if we see users requesting for it. + */ +private static final double CAP_RATIO_OF_BROADCAST = 0.5; + +private final int maxParallelism; +private final int minParallelism; +private final long dataVolumePerTask; +private final int defaultSourceParallelism; + +private DefaultVertexParallelismDecider( +int maxParallelism, +int minParallelism, +MemorySize dataVolumePerTask, +int defaultSourceParallelism) { + +checkArgument(minParallelism > 0, "Invalid minimum parallelism."); +checkArgument( +maxParallelism >= minParallelism, +"Maximum parallelism should greater than minimum parallelism."); Review comment: Maximum parallelism should greater or equal than minimum parallelism. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not the same
flinkbot edited a comment on pull request #17939: URL: https://github.com/apache/flink/pull/17939#issuecomment-981420515 ## CI report: * a124edc8f7930c1f9a7078418b2117646e464a0e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27773) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-18165) When savingpoint is restored, select the checkpoint directory and stateBackend
[ https://issues.apache.org/jira/browse/FLINK-18165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-18165. Resolution: Fixed This ticket has been resolved by FLINK-20976 > When savingpoint is restored, select the checkpoint directory and stateBackend > -- > > Key: FLINK-18165 > URL: https://issues.apache.org/jira/browse/FLINK-18165 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.9.0 > Environment: flink 1.9 >Reporter: Xinyuan Liu >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > If the checkpoint file is used as the initial state of the savepoint startup, > it must be ensured that the state backend used before and after is the same > type, but in actual production, there will be more and more state, the > taskmanager memory is insufficient and the cluster cannot be expanded, and > the state backend needs to be switched at this time. And there is a need to > ensure data consistency. Unfortunately, currently flink does not provide an > elegant way to switch state backend, can the community consider this proposal -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #17655: [FLINK-24728][tests] Add tests to ensure SQL file sink closes all created files
flinkbot edited a comment on pull request #17655: URL: https://github.com/apache/flink/pull/17655#issuecomment-958717470 ## CI report: * b3867f269b1f83df6e44493def7c78a18e962a5d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26396) * 558b57cbfaf1a062c49b74682f905cc65b06f2c3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27783) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-19008) Flink Job runs slow after restore + downscale from an incremental checkpoint (rocksdb)
[ https://issues.apache.org/jira/browse/FLINK-19008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-19008. Resolution: Information Provided Flink has bumpped RocksDB version to 6.20.3 which adopts this compaction priority as default. However, I don't think this change could benefit a lot for performance beavior. Let's see problem still existed after Flink-1.14. > Flink Job runs slow after restore + downscale from an incremental checkpoint > (rocksdb) > -- > > Key: FLINK-19008 > URL: https://issues.apache.org/jira/browse/FLINK-19008 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Jun Qin >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > perfomance, usability > > A customer runs a Flink job with RocksDB state backend. Checkpoints are > retained and done incrementally. The state size is several TB. When they > restore + downscale from a retained checkpoint, although the downloading of > checkpoint files took ~20min, the job throughput returns to the expected > level only after 3 hours. > I do not have RocksDB logs. The suspicion for those 3 hours is due to heavy > RocksDB compaction and/or flush. As it was observed that checkpoint could not > finish faster enough due to long {{checkpoint duration (sync)}}. How can we > make this restoring phase shorter? > For compaction, I think it is worth to check the improvement of: > {code:c} > CompactionPri compaction_pri = kMinOverlappingRatio;{code} > which has been set to default in RocksDB 6.x: > {code:c} > // In Level-based compaction, it Determines which file from a level to be > // picked to merge to the next level. We suggest people try > // kMinOverlappingRatio first when you tune your database. > enum CompactionPri : char { > // Slightly prioritize larger files by size compensated by #deletes > kByCompensatedSize = 0x0, > // First compact files whose data's latest update time is oldest. > // Try this if you only update some hot keys in small ranges. > kOldestLargestSeqFirst = 0x1, > // First compact files whose range hasn't been compacted to the next level > // for the longest. If your updates are random across the key space, > // write amplification is slightly better with this option. > kOldestSmallestSeqFirst = 0x2, > // First compact files whose ratio between overlapping size in next level > // and its size is the smallest. It in many cases can optimize write > // amplification. > kMinOverlappingRatio = 0x3, > }; > ... > // Default: kMinOverlappingRatio > CompactionPri compaction_pri = kMinOverlappingRatio;{code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #17655: [FLINK-24728][tests] Add tests to ensure SQL file sink closes all created files
flinkbot edited a comment on pull request #17655: URL: https://github.com/apache/flink/pull/17655#issuecomment-958717470 ## CI report: * b3867f269b1f83df6e44493def7c78a18e962a5d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26396) * 558b57cbfaf1a062c49b74682f905cc65b06f2c3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition
flinkbot edited a comment on pull request #18050: URL: https://github.com/apache/flink/pull/18050#issuecomment-988466514 ## CI report: * 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27771) * daeb5ff033438d2aa6f314a756ca9f1c5f5838b5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27778) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-23532) Unify stop-with-savepoint w drain and w/o drain
[ https://issues.apache.org/jira/browse/FLINK-23532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-23532. Resolution: Implemented Implemented in 72a2471fdb08a625cbe173ef89b53db8425a14b6..2b167ae9764c02d4c77a41f2afd056ed8fd5f04f > Unify stop-with-savepoint w drain and w/o drain > --- > > Key: FLINK-23532 > URL: https://issues.apache.org/jira/browse/FLINK-23532 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / Task >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > The code paths for stop-with-savepoint --drain and w/o drain are different > after FLINK-23408. We should unify the two and in both cases we should wait > for the savepoint in afterInvoke and close sources before triggering the last > checkpoint. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18014: [FLINK-24857][test][Kafka] Upgrade SourceReaderTestBase t…
flinkbot edited a comment on pull request #18014: URL: https://github.com/apache/flink/pull/18014#issuecomment-986441468 ## CI report: * a00cd7df6de0a7db9dc0517a8eeb9e0cd8465a6a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27723) * d21e6dcc5ea43433e5efe0b46a8dca9c520b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27781) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dawidwys merged pull request #17968: [FLINK-23532] Unify stop-with-savepoint without and with drain
dawidwys merged pull request #17968: URL: https://github.com/apache/flink/pull/17968 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18014: [FLINK-24857][test][Kafka] Upgrade SourceReaderTestBase t…
flinkbot edited a comment on pull request #18014: URL: https://github.com/apache/flink/pull/18014#issuecomment-986441468 ## CI report: * a00cd7df6de0a7db9dc0517a8eeb9e0cd8465a6a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27723) * d21e6dcc5ea43433e5efe0b46a8dca9c520b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20996) Using a cryptographically weak Pseudo Random Number Generator (PRNG)
[ https://issues.apache.org/jira/browse/FLINK-20996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17455009#comment-17455009 ] Yun Tang commented on FLINK-20996: -- [~yaxiao] AbstractTtlStateVerifier is just a class for unit test instead of running in production environment, and I don't have a idea why this would be attacked in a security context. > Using a cryptographically weak Pseudo Random Number Generator (PRNG) > > > Key: FLINK-20996 > URL: https://issues.apache.org/jira/browse/FLINK-20996 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Ya Xiao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > We are a security research team at Virginia Tech. We are doing an empirical > study about the usefulness of the existing security vulnerability detection > tools. The following is a reported vulnerability by certain tools. We'll so > appreciate it if you can give any feedback on it. > *Vulnerability Description:* > {color:#172b4d}In file > {color}[flink/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java,|https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java] > use java.util.Random instead of java.security.SecureRandom at Line 39. > *Security Impact:* > Java.util.Random is not cryptographically strong and may expose sensitive > information to certain types of attacks when used in a security context. > *Useful Resources*: > [https://cwe.mitre.org/data/definitions/338.html] > *Solution we suggest:* > Replace it with SecureRandom > *Please share with us your opinions/comments if there is any:* > Is the bug report helpful? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not th
JingsongLi commented on a change in pull request #17939: URL: https://github.com/apache/flink/pull/17939#discussion_r764598800 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java ## @@ -150,6 +150,25 @@ + "or force materialization(FORCE).") .build()); +@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) +public static final ConfigOption TABLE_EXEC_SINK_KEYED_SHUFFLE = +key("table.exec.sink.keyed-shuffle") +.enumType(SinkKeyedShuffle.class) +.defaultValue(SinkKeyedShuffle.AUTO) +.withDescription( +Description.builder() +.text( +"In order to minimize the distributed disorder problem when writing data into table with primary keys that many users suffers. " ++ "FLINK will auto add a keyed shuffle by default when the sink's parallelism differs from upstream operator and upstream is append only. " ++ "This works only when the upstream ensures the multi-records' order on the primary key, if not, the added shuffle can not solve " Review comment: `the multi-records' order`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17988: [FLINK-25010][Connectors/Hive] Speed up hive's createMRSplits by multi thread
flinkbot edited a comment on pull request #17988: URL: https://github.com/apache/flink/pull/17988#issuecomment-984363654 ## CI report: * 92f5f7c95542c9a08863324d5c885106e554a6c2 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27768) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17462: [FLINK-23170] Write metadata after materialization
flinkbot edited a comment on pull request #17462: URL: https://github.com/apache/flink/pull/17462#issuecomment-942024789 ## CI report: * ddc8a47426d41b78f448556c5b9a8bbf3ad8c19b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27767) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-21726) Fix checkpoint stuck
[ https://issues.apache.org/jira/browse/FLINK-21726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang closed FLINK-21726. Fix Version/s: (was: 1.15.0) (was: 1.14.1) Resolution: Information Provided Since we already bumpped up the rocksdb version, the fix of this problem has been included in that version. > Fix checkpoint stuck > > > Key: FLINK-21726 > URL: https://issues.apache.org/jira/browse/FLINK-21726 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.11.3, 1.12.2, 1.13.0 >Reporter: future >Priority: Minor > Labels: auto-deprioritized-critical, auto-deprioritized-major > > h1. 1. Bug description: > When RocksDB Checkpoint, it may be stuck in > `WaitUntilFlushWouldNotStallWrites` method. > h1. 2. Simple analysis of the reasons: > h2. 2.1 Configuration parameters: > > {code:java} > # Flink yaml: > state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM > state.backend.rocksdb.compaction.style: UNIVERSAL > # corresponding RocksDB config > Compaction Style : Universal > max_write_buffer_number : 4 > min_write_buffer_number_to_merge : 3{code} > Checkpoint is usually very fast. When the Checkpoint is executed, > `WaitUntilFlushWouldNotStallWrites` is called. If there are 2 Immutable > MemTables, which are less than `min_write_buffer_number_to_merge`, they will > not be flushed. But will enter this code. > > {code:java} > // method: GetWriteStallConditionAndCause > if (mutable_cf_options.max_write_buffer_number> 3 && > num_unflushed_memtables >= > mutable_cf_options.max_write_buffer_number-1) { > return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit}; > } > {code} > code link: > [https://github.com/facebook/rocksdb/blob/fbed72f03c3d9e4fdca3e5993587ef2559ba6ab9/db/column_family.cc#L847] > Checkpoint thought there was a FlushJob, but it didn't. So will always wait. > h2. 2.2 solution: > Increase the restriction: the `number of Immutable MemTable` >= > `min_write_buffer_number_to_merge will wait`. > The rocksdb community has fixed this bug, link: > [https://github.com/facebook/rocksdb/pull/7921] > h2. 2.3 Code that can reproduce the bug: > [https://github.com/1996fanrui/fanrui-learning/blob/flink-1.12/module-java/src/main/java/com/dream/rocksdb/RocksDBCheckpointStuck.java] > h1. 3. Interesting point > This bug will be triggered only when `the number of sorted runs >= > level0_file_num_compaction_trigger`. > Because there is a break in WaitUntilFlushWouldNotStallWrites. > {code:java} > if (cfd->imm()->NumNotFlushed() < > cfd->ioptions()->min_write_buffer_number_to_merge && > vstorage->l0_delay_trigger_count() < > mutable_cf_options.level0_file_num_compaction_trigger) { > break; > } > {code} > code link: > [https://github.com/facebook/rocksdb/blob/fbed72f03c3d9e4fdca3e5993587ef2559ba6ab9/db/db_impl/db_impl_compaction_flush.cc#L1974] > Universal may have `l0_delay_trigger_count() >= > level0_file_num_compaction_trigger`, so this bug is triggered. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-22962) Key group is not in KeyGroupRange error while checkpointing
[ https://issues.apache.org/jira/browse/FLINK-22962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17454991#comment-17454991 ] Yun Tang commented on FLINK-22962: -- [~prateekkohli2112] After flink-1.13, we unify the format of savepoint of RocksDB and heap keyed state backends. Will you still face the problem? > Key group is not in KeyGroupRange error while checkpointing > --- > > Key: FLINK-22962 > URL: https://issues.apache.org/jira/browse/FLINK-22962 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.12.1 > Environment: Linux >Reporter: Prateek Kohli >Priority: Major > > Hi, > > We are getting the below exception while using rocksdb as state backend at > the time of checkpointing: > 2021-06-10 12:05:13,933 INFO > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - > Aggregator (3/4)#0 - asynchronous part of checkpoint 2 could not be completed. > java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: > Key group 0 is not in KeyGroupRange\{startKeyGroup=5, endKeyGroup=7}. > at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_261] > at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_261] > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:621) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54) > ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:122) > [flink-streaming-java_2.11-1.12.1.jar:1.12.1] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_261] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_261] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261] > Caused by: java.lang.IllegalArgumentException: Key group 0 is not in > KeyGroupRange\{startKeyGroup=5, endKeyGroup=7}. > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:144) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:106) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:333) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:264) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:227) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:180) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_261] > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:618) > ~[flink-dist_2.11-1.12.1.jar:1.12.1] > ... 5 more > > When we change the state backend to file or heap we do not get this error. > > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase
flinkbot edited a comment on pull request #18049: URL: https://github.com/apache/flink/pull/18049#issuecomment-988465299 ## CI report: * da40e9da154b7273f6db5eb12d65c983636cd1e6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27770) * Unknown: [CANCELED](TBD) * e060cdbc3f026f9d242e7d6f3d0f03496a4ffe24 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27780) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.
flinkbot edited a comment on pull request #17952: URL: https://github.com/apache/flink/pull/17952#issuecomment-982356774 ## CI report: * c18948fda6752079a297579c26db0316376390bd Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27720) * d3dd231a393793dabc3a1f218981119f8e4259d7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27779) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase
flinkbot edited a comment on pull request #18049: URL: https://github.com/apache/flink/pull/18049#issuecomment-988465299 ## CI report: * da40e9da154b7273f6db5eb12d65c983636cd1e6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27770) * Unknown: [CANCELED](TBD) * e060cdbc3f026f9d242e7d6f3d0f03496a4ffe24 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.
flinkbot edited a comment on pull request #17952: URL: https://github.com/apache/flink/pull/17952#issuecomment-982356774 ## CI report: * c18948fda6752079a297579c26db0316376390bd Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27720) * d3dd231a393793dabc3a1f218981119f8e4259d7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuyangzhong commented on pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase
xuyangzhong commented on pull request #18049: URL: https://github.com/apache/flink/pull/18049#issuecomment-988539996 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.
wanglijie95 commented on a change in pull request #17952: URL: https://github.com/apache/flink/pull/17952#discussion_r764582435 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java ## @@ -81,52 +88,36 @@ private int calculateParallelism(List consumedResults) { .reduce(0L, Long::sum)) .sum(); -if (broadcastBytes > dataVolumePerTask -|| (broadcastBytes == dataVolumePerTask && nonBroadcastBytes > 0)) { -LOG.warn( -"The minimum size of one task to process is larger than " -+ "the size of data volume which is configured by " -+ "'" -+ JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key() -+ "'. " -+ "Parallelism will be set to {}.", -maxParallelism); - -return maxParallelism; -} else if (broadcastBytes == dataVolumePerTask) { -return minParallelism; -} else { -int parallelism = -(int) -Math.ceil( -(double) nonBroadcastBytes -/ (dataVolumePerTask - broadcastBytes)); -parallelism = Math.max(parallelism, minParallelism); -parallelism = Math.min(parallelism, maxParallelism); -return parallelism; -} -} - -/** The factory to instantiate {@link DefaultVertexParallelismDecider}. */ -public static class Factory implements VertexParallelismDecider.Factory { +long expectedMaxBroadcastBytes = +(long) Math.ceil((dataVolumePerTask * CAP_RATIO_OF_BROADCAST)); -private final Configuration configuration; +if (broadcastBytes > expectedMaxBroadcastBytes) { +LOG.info( +"The number of broadcast bytes: {} is larger than the expected maximum value: {} ('{}' * {})." ++ " Use the expected maximum value as the number of broadcast bytes to decide the parallelism.", +broadcastBytes, +expectedMaxBroadcastBytes, + JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(), +CAP_RATIO_OF_BROADCAST); -public Factory(Configuration configuration) { -this.configuration = configuration; +broadcastBytes = expectedMaxBroadcastBytes; } -@Override -public VertexParallelismDecider create() { -return new DefaultVertexParallelismDecider( -configuration.getInteger( - JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM), -configuration.getInteger( - JobManagerOptions.ADAPTIVE_BACH_SCHEDULER_MIN_PARALLELISM), -configuration.get( - JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK), -configuration.get( - JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DEFAULT_SOURCE_PARALLELISM)); -} +int parallelism = +(int) Math.ceil((double) nonBroadcastBytes / (dataVolumePerTask - broadcastBytes)); +parallelism = Math.max(parallelism, minParallelism); +parallelism = Math.min(parallelism, maxParallelism); +return parallelism; +} + +public static DefaultVertexParallelismDecider from(Configuration configuration) { +return new DefaultVertexParallelismDecider( Review comment: Yes, you are right. I will add the checks in ctor. The `dataVolumePerTask` is `MemorySize` type, I will check it's not null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25132) KafkaSource cannot work with object-reusing DeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-25132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17454980#comment-17454980 ] Qingsheng Ren commented on FLINK-25132: --- [~mason6345] I think so. We'll make a back-port on 1.13 after fix this on master > KafkaSource cannot work with object-reusing DeserializationSchema > - > > Key: FLINK-25132 > URL: https://issues.apache.org/jira/browse/FLINK-25132 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0, 1.15.0, 1.14.1 >Reporter: Qingsheng Ren >Assignee: Qingsheng Ren >Priority: Blocker > Labels: pull-request-available > Fix For: 1.15.0, 1.14.1 > > > Currently Kafka source deserializes ConsumerRecords in split reader and puts > them into the elementQueue, then task's main thread polls these records from > the queue asynchronously. This mechanism cannot cooperate with > DeserializationSchemas with object reuse: all records staying in the element > queue points to the same object. > A solution would be moving deserialization to RecordEmitter, which works in > the task's main thread. > Notes that this issue actually effects all sources which do deserialization > in split reader. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] MyLanPangzi commented on a change in pull request #14376: [FLINK-18202][PB format] New Format of protobuf
MyLanPangzi commented on a change in pull request #14376: URL: https://github.com/apache/flink/pull/14376#discussion_r764575993 ## File path: flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatUtils.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.util.FlinkRuntimeException; + +import com.google.protobuf.Descriptors; +import org.apache.commons.lang3.StringUtils; + +/** Protobuf function util. */ +public class PbFormatUtils { + +/** + * protobuf code has a bug that, f_abc_7d will be convert to fAbc7d in {@code + * com.google.protobuf.Descriptors.FileDescriptor.getJsonName()}, but actually we need fAbc7D. + */ +public static String fieldNameToJsonName(String name) { +final int length = name.length(); +StringBuilder result = new StringBuilder(length); +boolean isNextUpperCase = false; +for (int i = 0; i < length; i++) { +char ch = name.charAt(i); +if (ch == '_') { +isNextUpperCase = true; +} else if (isNextUpperCase) { +if ('a' <= ch && ch <= 'z') { +ch = (char) (ch - 'a' + 'A'); +isNextUpperCase = false; +} +result.append(ch); +} else { +result.append(ch); +} +} +return result.toString(); +} + +private static String getJavaPackageFromProtoFile(Descriptors.Descriptor descriptor) { +boolean hasJavaPackage = descriptor.getFile().getOptions().hasJavaPackage(); +if (hasJavaPackage) { +String javaPackage = descriptor.getFile().getOptions().getJavaPackage(); +if (StringUtils.isBlank(javaPackage)) { +throw new FlinkRuntimeException("java_package cannot be blank string"); +} +return javaPackage; +} else { +String packageName = descriptor.getFile().getPackage(); +if (StringUtils.isBlank(packageName)) { +throw new FlinkRuntimeException("package and java_package cannot both be empty"); +} +return packageName; +} +} + +public static String getFullJavaName(Descriptors.Descriptor descriptor) { +String javaPackageName = getJavaPackageFromProtoFile(descriptor); +if (descriptor.getFile().getOptions().getJavaMultipleFiles()) { +// multiple_files=true +if (null != descriptor.getContainingType()) { +// nested type +String parentJavaFullName = getFullJavaName(descriptor.getContainingType()); +return parentJavaFullName + "." + descriptor.getName(); +} else { +// top level message +return javaPackageName + "." + descriptor.getName(); +} +} else { +// multiple_files=false +if (null != descriptor.getContainingType()) { +// nested type +String parentJavaFullName = getFullJavaName(descriptor.getContainingType()); +return parentJavaFullName + "." + descriptor.getName(); +} else { +// top level message +if (!descriptor.getFile().getOptions().hasJavaOuterClassname()) { +// user do not define outer class name in proto file +return javaPackageName Review comment: @maosuhan 1.I find some bugs. When i don't declare **java_outer_classname=XXX** or **java_multiple_files=true** option in file .proto(proto2) this will be generated x.y.z.xxxOuterClass.MyPBClass. It is incorrect. generated code: public static RowData decode(data.LogOuterClass.Log message){ my pb code data.Data.Log 2.How to identify inner class? -- This is an automated message from the Apache Git Service. To respond to the message, please log
[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
flinkbot edited a comment on pull request #16108: URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884 ## CI report: * d7d66d77b7ac632cee04bfe17c7671e048535fc1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-23391) KafkaSourceReaderTest.testKafkaSourceMetrics fails on azure
[ https://issues.apache.org/jira/browse/FLINK-23391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17454978#comment-17454978 ] Qingsheng Ren commented on FLINK-23391: --- This one is quite similar to FLINK-20928, probably caused by the flaky {{NetworkFailureProxy}} between Flink and Kafka. I'll try to add a retry first and make a refactor on the current Kafka test infra after removing the legacy {{FlinkKafkaConsumer}} and {{{}FlinkKafkaProducer{}}}. > KafkaSourceReaderTest.testKafkaSourceMetrics fails on azure > --- > > Key: FLINK-23391 > URL: https://issues.apache.org/jira/browse/FLINK-23391 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0, 1.13.1 >Reporter: Xintong Song >Assignee: Qingsheng Ren >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.15.0, 1.14.1, 1.13.4 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20456=logs=c5612577-f1f7-5977-6ff6-7432788526f7=53f6305f-55e6-561c-8f1e-3a1dde2c77df=6783 > {code} > Jul 14 23:00:26 [ERROR] Tests run: 10, Failures: 0, Errors: 1, Skipped: 0, > Time elapsed: 99.93 s <<< FAILURE! - in > org.apache.flink.connector.kafka.source.reader.KafkaSourceReaderTest > Jul 14 23:00:26 [ERROR] > testKafkaSourceMetrics(org.apache.flink.connector.kafka.source.reader.KafkaSourceReaderTest) > Time elapsed: 60.225 s <<< ERROR! > Jul 14 23:00:26 java.util.concurrent.TimeoutException: Offsets are not > committed successfully. Dangling offsets: > {15213={KafkaSourceReaderTest-0=OffsetAndMetadata{offset=10, > leaderEpoch=null, metadata=''}}} > Jul 14 23:00:26 at > org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210) > Jul 14 23:00:26 at > org.apache.flink.connector.kafka.source.reader.KafkaSourceReaderTest.testKafkaSourceMetrics(KafkaSourceReaderTest.java:275) > Jul 14 23:00:26 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Jul 14 23:00:26 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Jul 14 23:00:26 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Jul 14 23:00:26 at java.lang.reflect.Method.invoke(Method.java:498) > Jul 14 23:00:26 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > Jul 14 23:00:26 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Jul 14 23:00:26 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > Jul 14 23:00:26 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Jul 14 23:00:26 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > Jul 14 23:00:26 at > org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239) > Jul 14 23:00:26 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Jul 14 23:00:26 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > Jul 14 23:00:26 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > Jul 14 23:00:26 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > Jul 14 23:00:26 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > Jul 14 23:00:26 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > Jul 14 23:00:26 at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > Jul 14 23:00:26 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > Jul 14 23:00:26 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > Jul 14 23:00:26 at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > Jul 14 23:00:26 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > Jul 14 23:00:26 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > Jul 14 23:00:26 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > Jul 14 23:00:26 at > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > Jul 14 23:00:26 at org.junit.runners.Suite.runChild(Suite.java:128) > Jul 14 23:00:26 at org.junit.runners.Suite.runChild(Suite.java:27) > Jul 14 23:00:26 at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > Jul 14 23:00:26 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > Jul 14 23:00:26 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > Jul
[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation
yunfengzhou-hub commented on a change in pull request #32: URL: https://github.com/apache/flink-ml/pull/32#discussion_r764572915 ## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayesModel.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification.naivebayes; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.Model; +import org.apache.flink.ml.common.broadcast.BroadcastUtils; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.BLAS; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.ArrayUtils; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** A Model which classifies data using the model data computed by {@link NaiveBayes}. */ +public class NaiveBayesModel +implements Model, NaiveBayesModelParams { +private final Map, Object> paramMap = new HashMap<>(); +private Table modelDataTable; + +public NaiveBayesModel() { +ParamUtils.initializeMapWithDefaultValues(paramMap, this); +} + +@Override +public Table[] transform(Table... inputs) { +Preconditions.checkArgument(inputs.length == 1); + +final String predictionCol = getPredictionCol(); +final String featuresCol = getFeaturesCol(); +final String broadcastModelKey = "NaiveBayesModelStream"; + +RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema()); +RowTypeInfo outputTypeInfo = +new RowTypeInfo( +ArrayUtils.addAll( +inputTypeInfo.getFieldTypes(), TypeInformation.of(Integer.class)), +ArrayUtils.addAll(inputTypeInfo.getFieldNames(), predictionCol)); + +StreamTableEnvironment tEnv = +(StreamTableEnvironment) ((TableImpl) modelDataTable).getTableEnvironment(); +DataStream modelDataStream = +NaiveBayesModelData.getModelDataStream(modelDataTable); +DataStream input = tEnv.toDataStream(inputs[0]); + +Map> broadcastMap = new HashMap<>(); +broadcastMap.put(broadcastModelKey, modelDataStream); + +Function>, DataStream> function = +dataStreams -> { +DataStream stream = dataStreams.get(0); +return stream.transform( +this.getClass().getSimpleName(), +outputTypeInfo, +new
[GitHub] [flink] flinkbot edited a comment on pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition
flinkbot edited a comment on pull request #18050: URL: https://github.com/apache/flink/pull/18050#issuecomment-988466514 ## CI report: * 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27771) * daeb5ff033438d2aa6f314a756ca9f1c5f5838b5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27778) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition
flinkbot edited a comment on pull request #18050: URL: https://github.com/apache/flink/pull/18050#issuecomment-988466514 ## CI report: * 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27771) * daeb5ff033438d2aa6f314a756ca9f1c5f5838b5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] deadwind4 commented on pull request #18033: [FLINK-25141][connector/elasticsearch] add sink parallelism option
deadwind4 commented on pull request #18033: URL: https://github.com/apache/flink/pull/18033#issuecomment-988524457 @fapaul please review it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
flinkbot edited a comment on pull request #16108: URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884 ## CI report: * 850a2914588ba4bf362f5b800a053708c32bac22 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27774) * d7d66d77b7ac632cee04bfe17c7671e048535fc1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
flinkbot edited a comment on pull request #16108: URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884 ## CI report: * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27769) * 850a2914588ba4bf362f5b800a053708c32bac22 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27774) * d7d66d77b7ac632cee04bfe17c7671e048535fc1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
flinkbot edited a comment on pull request #16108: URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884 ## CI report: * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27769) * 850a2914588ba4bf362f5b800a053708c32bac22 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27774) * d7d66d77b7ac632cee04bfe17c7671e048535fc1 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] paul8263 commented on a change in pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
paul8263 commented on a change in pull request #16108: URL: https://github.com/apache/flink/pull/16108#discussion_r764562387 ## File path: flink-core/src/main/java/org/apache/flink/util/FileLock.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.OverlappingFileLockException; +import java.nio.file.Paths; + +/** A file lock used for avoiding race condition among multiple unit test processes */ +public class FileLock { +private static final long TRY_LOCK_INTERVAL = 50; +private final File file; +private FileOutputStream outputStream; +private java.nio.channels.FileLock lock; + +public FileLock(String fileName) { +this.file = new File(fileName); +} + +private void init() throws IOException { +String filePath = +Paths.get(System.getProperty("java.io.tmpdir"), file.getName()).toString(); +File lockFile = new File(filePath); +if (!lockFile.exists()) { +lockFile.createNewFile(); +} + +outputStream = new FileOutputStream(lockFile); +} Review comment: Hi @AHeise , `IOException` might not be thrown if the file already exists. We would get this exception if the parent directory of the file has not been created. Sorry that I messed up the code while I was testing the snipped somewhere else. There is no need to create another `lockFile`. I reconsidered the `tryLock` issue. I think the lock releasing/destroying should be outside `tryLock` and explicitly called even if failed to acquire the lock. Correct me if I am wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
flinkbot edited a comment on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765 ## CI report: * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27772) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the num
flinkbot edited a comment on pull request #17692: URL: https://github.com/apache/flink/pull/17692#issuecomment-961748638 ## CI report: * 5ee2449e410094eec0c7523a4dfee58091668398 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27678) * 5378879134e1c08ada5239c650d2ce77cff0f3e3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27776) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the num
flinkbot edited a comment on pull request #17692: URL: https://github.com/apache/flink/pull/17692#issuecomment-961748638 ## CI report: * 5ee2449e410094eec0c7523a4dfee58091668398 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27678) * 5378879134e1c08ada5239c650d2ce77cff0f3e3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
flinkbot edited a comment on pull request #16108: URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884 ## CI report: * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27769) * 850a2914588ba4bf362f5b800a053708c32bac22 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27774) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18048: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key
flinkbot edited a comment on pull request #18048: URL: https://github.com/apache/flink/pull/18048#issuecomment-988409163 ## CI report: * 03c879c08980eedf2bba6b46a6e5eeb0a0df31b8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27763) * c10e864f5d0434074c43910f6eef5d2ae1aba793 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27775) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18048: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key
flinkbot edited a comment on pull request #18048: URL: https://github.com/apache/flink/pull/18048#issuecomment-988409163 ## CI report: * 03c879c08980eedf2bba6b46a6e5eeb0a0df31b8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27763) * c10e864f5d0434074c43910f6eef5d2ae1aba793 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18048: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key
flinkbot edited a comment on pull request #18048: URL: https://github.com/apache/flink/pull/18048#issuecomment-988409163 ## CI report: * 03c879c08980eedf2bba6b46a6e5eeb0a0df31b8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27763) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
flinkbot edited a comment on pull request #16108: URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884 ## CI report: * a696994e8970d98e2f77aa7a4e1db93bb0ecbe53 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27718) * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27769) * 850a2914588ba4bf362f5b800a053708c32bac22 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27774) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
flinkbot edited a comment on pull request #16108: URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884 ## CI report: * a696994e8970d98e2f77aa7a4e1db93bb0ecbe53 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27718) * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27769) * 850a2914588ba4bf362f5b800a053708c32bac22 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not the same
flinkbot edited a comment on pull request #17939: URL: https://github.com/apache/flink/pull/17939#issuecomment-981420515 ## CI report: * 9ccb311aec0b5f0d14197c5de37a171ea15c6097 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27244) * a124edc8f7930c1f9a7078418b2117646e464a0e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27773) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
flinkbot edited a comment on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765 ## CI report: * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN * 888452faa35e98b1151416484edfcf9e097f680f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741) * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27772) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not the same
flinkbot edited a comment on pull request #17939: URL: https://github.com/apache/flink/pull/17939#issuecomment-981420515 ## CI report: * 9ccb311aec0b5f0d14197c5de37a171ea15c6097 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27244) * a124edc8f7930c1f9a7078418b2117646e464a0e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil commented on pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not the same for
lincoln-lil commented on pull request #17939: URL: https://github.com/apache/flink/pull/17939#issuecomment-988482409 Thanks for your review! @JingsongLi I've address your comments and update the pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
flinkbot edited a comment on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765 ## CI report: * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN * 888452faa35e98b1151416484edfcf9e097f680f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741) * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
flinkbot edited a comment on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765 ## CI report: * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN * 888452faa35e98b1151416484edfcf9e097f680f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
flinkbot edited a comment on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765 ## CI report: * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN * 888452faa35e98b1151416484edfcf9e097f680f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741) * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.
zhuzhurk commented on a change in pull request #17952: URL: https://github.com/apache/flink/pull/17952#discussion_r764533772 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java ## @@ -81,52 +88,36 @@ private int calculateParallelism(List consumedResults) { .reduce(0L, Long::sum)) .sum(); -if (broadcastBytes > dataVolumePerTask -|| (broadcastBytes == dataVolumePerTask && nonBroadcastBytes > 0)) { -LOG.warn( -"The minimum size of one task to process is larger than " -+ "the size of data volume which is configured by " -+ "'" -+ JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key() -+ "'. " -+ "Parallelism will be set to {}.", -maxParallelism); - -return maxParallelism; -} else if (broadcastBytes == dataVolumePerTask) { -return minParallelism; -} else { -int parallelism = -(int) -Math.ceil( -(double) nonBroadcastBytes -/ (dataVolumePerTask - broadcastBytes)); -parallelism = Math.max(parallelism, minParallelism); -parallelism = Math.min(parallelism, maxParallelism); -return parallelism; -} -} - -/** The factory to instantiate {@link DefaultVertexParallelismDecider}. */ -public static class Factory implements VertexParallelismDecider.Factory { +long expectedMaxBroadcastBytes = +(long) Math.ceil((dataVolumePerTask * CAP_RATIO_OF_BROADCAST)); -private final Configuration configuration; +if (broadcastBytes > expectedMaxBroadcastBytes) { +LOG.info( +"The number of broadcast bytes: {} is larger than the expected maximum value: {} ('{}' * {})." ++ " Use the expected maximum value as the number of broadcast bytes to decide the parallelism.", +broadcastBytes, +expectedMaxBroadcastBytes, + JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(), +CAP_RATIO_OF_BROADCAST); -public Factory(Configuration configuration) { -this.configuration = configuration; +broadcastBytes = expectedMaxBroadcastBytes; } -@Override -public VertexParallelismDecider create() { -return new DefaultVertexParallelismDecider( -configuration.getInteger( - JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM), -configuration.getInteger( - JobManagerOptions.ADAPTIVE_BACH_SCHEDULER_MIN_PARALLELISM), -configuration.get( - JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK), -configuration.get( - JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DEFAULT_SOURCE_PARALLELISM)); -} +int parallelism = +(int) Math.ceil((double) nonBroadcastBytes / (dataVolumePerTask - broadcastBytes)); +parallelism = Math.max(parallelism, minParallelism); +parallelism = Math.min(parallelism, maxParallelism); +return parallelism; +} + +public static DefaultVertexParallelismDecider from(Configuration configuration) { +return new DefaultVertexParallelismDecider( Review comment: I think we need to do some checks about the params, like minParallelism> 0, minParallelism < maxParallelism, dataVolumePerTask > 0, defaultSourceParallelism > 0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
flinkbot edited a comment on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765 ## CI report: * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN * 888452faa35e98b1151416484edfcf9e097f680f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
flinkbot edited a comment on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765 ## CI report: * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN * 888452faa35e98b1151416484edfcf9e097f680f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741) * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-25218) Performance issues with lookup join accessing external dimension tables
[ https://issues.apache.org/jira/browse/FLINK-25218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ZhuoYu Chen updated FLINK-25218: Affects Version/s: 1.15.0 > Performance issues with lookup join accessing external dimension tables > > > Key: FLINK-25218 > URL: https://issues.apache.org/jira/browse/FLINK-25218 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.15.0 >Reporter: ZhuoYu Chen >Priority: Major > > Current lookup join: for each input data, access the external dimension table > to get the result and output a data > Implement a lookup join that can improve performance by batching and delaying -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25218) Performance issues with lookup join accessing external dimension tables
ZhuoYu Chen created FLINK-25218: --- Summary: Performance issues with lookup join accessing external dimension tables Key: FLINK-25218 URL: https://issues.apache.org/jira/browse/FLINK-25218 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: ZhuoYu Chen Current lookup join: for each input data, access the external dimension table to get the result and output a data Implement a lookup join that can improve performance by batching and delaying -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
flinkbot edited a comment on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765 ## CI report: * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN * 888452faa35e98b1151416484edfcf9e097f680f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
flinkbot edited a comment on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765 ## CI report: * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN * 888452faa35e98b1151416484edfcf9e097f680f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741) * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] maosuhan commented on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
maosuhan commented on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-988473669 @MyLanPangzi I have updated the code to adaptively convert between enum of protobuf and numeric/string type of flink and also unit tests are added. Since codegen in java is a little tricky and hard to read, I have converted source files containing codegen logic to scala files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
flinkbot edited a comment on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765 ## CI report: * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN * 888452faa35e98b1151416484edfcf9e097f680f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not t
lincoln-lil commented on a change in pull request #17939: URL: https://github.com/apache/flink/pull/17939#discussion_r764528915 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java ## @@ -140,6 +140,25 @@ + "or force materialization(FORCE).") .build()); +@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) +public static final ConfigOption TABLE_EXEC_SINK_SHUFFLE_BY_PK = +key("table.exec.sink.pk-shuffle") Review comment: I prefer 'keyed-shuffle', what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
flinkbot edited a comment on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765 ## CI report: * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN * 888452faa35e98b1151416484edfcf9e097f680f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741) * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not t
lincoln-lil commented on a change in pull request #17939: URL: https://github.com/apache/flink/pull/17939#discussion_r764527157 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java ## @@ -140,6 +140,25 @@ + "or force materialization(FORCE).") .build()); +@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) +public static final ConfigOption TABLE_EXEC_SINK_SHUFFLE_BY_PK = +key("table.exec.sink.pk-shuffle") Review comment: I've considered several candidates but no most satisfied one. 'key-by-mode' is little subtle, what about 'key-by-pk' ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition
flinkbot edited a comment on pull request #18050: URL: https://github.com/apache/flink/pull/18050#issuecomment-988466514 ## CI report: * 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27771) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase
flinkbot edited a comment on pull request #18049: URL: https://github.com/apache/flink/pull/18049#issuecomment-988465299 ## CI report: * da40e9da154b7273f6db5eb12d65c983636cd1e6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27770) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition
flinkbot commented on pull request #18050: URL: https://github.com/apache/flink/pull/18050#issuecomment-988466514 ## CI report: * 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase
flinkbot commented on pull request #18049: URL: https://github.com/apache/flink/pull/18049#issuecomment-988465918 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit da40e9da154b7273f6db5eb12d65c983636cd1e6 (Wed Dec 08 03:29:41 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition
flinkbot commented on pull request #18050: URL: https://github.com/apache/flink/pull/18050#issuecomment-988465904 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 (Wed Dec 08 03:29:39 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.
zhuzhurk commented on a change in pull request #17952: URL: https://github.com/apache/flink/pull/17952#discussion_r764522853 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java ## @@ -58,22 +72,22 @@ public void testSourceJobVertex() { } @Test -public void testBroadcastBytesGreaterThanPerTaskBytes() { -BlockingResultInfo resultInfo1 = -new BlockingResultInfo(Arrays.asList(BYTE_512_MB, BYTE_1_GB), true); -BlockingResultInfo resultInfo2 = new BlockingResultInfo(Arrays.asList(BYTE_1_GB), false); +public void testDecideParallelism() { +BlockingResultInfo resultInfo1 = new BlockingResultInfo(Arrays.asList(BYTE_256_MB), true); +BlockingResultInfo resultInfo2 = +new BlockingResultInfo(Arrays.asList(BYTE_256_MB, BYTE_8_GB), false); int parallelism = decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2)); -assertThat(parallelism, is(MAX_PARALLELISM)); +assertThat(parallelism, is(11)); } @Test -public void testBroadcastBytesEqualToPerTaskBytesAndHaveNonBroadcastBytes() { -BlockingResultInfo resultInfo1 = -new BlockingResultInfo(Arrays.asList(BYTE_512_MB, BYTE_512_MB), true); -BlockingResultInfo resultInfo2 = new BlockingResultInfo(Arrays.asList(BYTE_512_MB), false); +public void testDecidedParallelismIsMaxParallelism() { Review comment: maybe testInitiallyDecidedParallelismIsLargerThanMaxParallelism? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.
zhuzhurk commented on a change in pull request #17952: URL: https://github.com/apache/flink/pull/17952#discussion_r764522853 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java ## @@ -58,22 +72,22 @@ public void testSourceJobVertex() { } @Test -public void testBroadcastBytesGreaterThanPerTaskBytes() { -BlockingResultInfo resultInfo1 = -new BlockingResultInfo(Arrays.asList(BYTE_512_MB, BYTE_1_GB), true); -BlockingResultInfo resultInfo2 = new BlockingResultInfo(Arrays.asList(BYTE_1_GB), false); +public void testDecideParallelism() { +BlockingResultInfo resultInfo1 = new BlockingResultInfo(Arrays.asList(BYTE_256_MB), true); +BlockingResultInfo resultInfo2 = +new BlockingResultInfo(Arrays.asList(BYTE_256_MB, BYTE_8_GB), false); int parallelism = decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2)); -assertThat(parallelism, is(MAX_PARALLELISM)); +assertThat(parallelism, is(11)); } @Test -public void testBroadcastBytesEqualToPerTaskBytesAndHaveNonBroadcastBytes() { -BlockingResultInfo resultInfo1 = -new BlockingResultInfo(Arrays.asList(BYTE_512_MB, BYTE_512_MB), true); -BlockingResultInfo resultInfo2 = new BlockingResultInfo(Arrays.asList(BYTE_512_MB), false); +public void testDecidedParallelismIsMaxParallelism() { Review comment: maybe testInitiallyDecidedParallelismIsLargerThanMinParallelism? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase
flinkbot commented on pull request #18049: URL: https://github.com/apache/flink/pull/18049#issuecomment-988465299 ## CI report: * da40e9da154b7273f6db5eb12d65c983636cd1e6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not t
lincoln-lil commented on a change in pull request #17939: URL: https://github.com/apache/flink/pull/17939#discussion_r764522531 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java ## @@ -232,26 +246,30 @@ private int deriveSinkParallelism( * messages. */ private Transformation applyKeyBy( -ChangelogMode changelogMode, +TableConfig config, Transformation inputTransform, int[] primaryKeys, int sinkParallelism, -boolean upsertMaterialize) { -final int inputParallelism = inputTransform.getParallelism(); -if ((inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) -&& !upsertMaterialize) { -return inputTransform; +int inputParallelism, +boolean inputInsertOnly, +boolean needMaterialize) { +boolean sameParallelism = sinkParallelism == inputParallelism; +final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk = + config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK); +boolean sinkKeyBy = false; +switch (sinkShuffleByPk) { +case NONE: +break; +case AUTO: +sinkKeyBy = inputInsertOnly && !sameParallelism; +break; +case FORCE: Review comment: The only exception is single parallelism. All other circumstances will force adding a keyby -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-25034) Support flexible number of subpartitions in IntermediateResultPartition
[ https://issues.apache.org/jira/browse/FLINK-25034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-25034: --- Labels: pull-request-available (was: ) > Support flexible number of subpartitions in IntermediateResultPartition > --- > > Key: FLINK-25034 > URL: https://issues.apache.org/jira/browse/FLINK-25034 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Lijie Wang >Assignee: Lijie Wang >Priority: Major > Labels: pull-request-available > > Currently, when a task is deployed, it needs to know the parallelism of its > consumer job vertex. This is because the consumer vertex parallelism is > needed to decide the _numberOfSubpartitions_ of _PartitionDescriptor_ which > is part of the {_}ResultPartitionDeploymentDescriptor{_}. The reason behind > that is, at the moment, for one result partition, different subpartitions > serve different consumer execution vertices. More specifically, one consumer > execution vertex only consumes data from subpartition with the same index. > Considering a dynamic graph, the parallelism of a job vertex may not have > been decided when its upstream vertices are deployed. To enable Flink to work > in this case, we need a way to allow an execution vertex to run without > knowing the parallelism of its consumer job vertices. One basic idea is to > enable multiple subpartitions in one result partition to serve the same > consumer execution vertex. > To achieve this goal, we can set the number of subpartitions to be the *max > parallelism* of the consumer job vertex. When the consumer vertex is > deployed, it should be assigned with a subpartition range to consume. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] wanglijie95 opened a new pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition
wanglijie95 opened a new pull request #18050: URL: https://github.com/apache/flink/pull/18050 ## What is the purpose of the change Support flexible number of subpartitions in IntermediateResultPartition. When the consumer parallelism is decided, number of subpartitions should be the **parallelism** of consumer job vertex(all to all). When the consumer parallelism is not decided, number of subpartitions should be the **max parallelism** of consumer job vertex. ## Brief change log 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 Support flexible number of subpartitions in IntermediateResultPartition. ## Verifying this change Unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**not applicable**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.
zhuzhurk commented on a change in pull request #17952: URL: https://github.com/apache/flink/pull/17952#discussion_r764522054 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java ## @@ -82,32 +96,32 @@ public void testBroadcastBytesEqualToPerTaskBytesAndHaveNonBroadcastBytes() { } @Test -public void testBroadcastBytesEqualToPerTaskBytesAndNoNonBroadcastBytes() { -BlockingResultInfo resultInfo = -new BlockingResultInfo(Arrays.asList(BYTE_512_MB, BYTE_512_MB), true); +public void testDecidedParallelismIsMinParallelism() { Review comment: maybe testInitiallyDecidedParallelismIsSmallerThanMinParallelism? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuyangzhong opened a new pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase
xuyangzhong opened a new pull request #18049: URL: https://github.com/apache/flink/pull/18049 ## What is the purpose of the change delete the test file ImplicitTypeConversionITCase and union them into the ImplicitConversionEqualsFunctionITCase to have the same code style with CastFunctionITCase ## Brief change log - delete the unused codes - union the test cases to the current test cases ## Verifying this change Test cases have been added to verify this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
flinkbot edited a comment on pull request #16108: URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884 ## CI report: * a696994e8970d98e2f77aa7a4e1db93bb0ecbe53 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27718) * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27769) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.
zhuzhurk commented on a change in pull request #17952: URL: https://github.com/apache/flink/pull/17952#discussion_r764519958 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import java.util.List; + +/** The blocking result info, which will be used to calculate the vertex parallelism. */ +public class BlockingResultInfo { + +private final List blockingPartitionSizes; + +private final boolean isBroadcast; + +public BlockingResultInfo(List blockingPartitionSizes, boolean isBroadcast) { Review comment: I prefer to introduce 2 factory methods to make the flag easy to understand, and then make this ctor private. Maybe `createFromBroadcastResult()` and create `fromNonBroadcastResult()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not t
lincoln-lil commented on a change in pull request #17939: URL: https://github.com/apache/flink/pull/17939#discussion_r764519877 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java ## @@ -232,26 +246,30 @@ private int deriveSinkParallelism( * messages. */ private Transformation applyKeyBy( -ChangelogMode changelogMode, +TableConfig config, Transformation inputTransform, int[] primaryKeys, int sinkParallelism, -boolean upsertMaterialize) { -final int inputParallelism = inputTransform.getParallelism(); -if ((inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) -&& !upsertMaterialize) { -return inputTransform; +int inputParallelism, +boolean inputInsertOnly, +boolean needMaterialize) { +boolean sameParallelism = sinkParallelism == inputParallelism; +final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk = + config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK); +boolean sinkKeyBy = false; +switch (sinkShuffleByPk) { +case NONE: +break; +case AUTO: +sinkKeyBy = inputInsertOnly && !sameParallelism; +break; +case FORCE: +// single parallelism has no problem +sinkKeyBy = !(sinkParallelism == 1 && inputParallelism == 1); Review comment: I was thought this highlighted the single parallelism exception, but now feels like there's no difference between the two version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] paul8263 commented on a change in pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
paul8263 commented on a change in pull request #16108: URL: https://github.com/apache/flink/pull/16108#discussion_r764519555 ## File path: flink-core/src/main/java/org/apache/flink/util/FileLock.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.OverlappingFileLockException; +import java.nio.file.Paths; + +/** A file lock used for avoiding race condition among multiple unit test processes */ +public class FileLock { +private static final long TRY_LOCK_INTERVAL = 50; +private final File file; +private FileOutputStream outputStream; +private java.nio.channels.FileLock lock; + +public FileLock(String fileName) { +this.file = new File(fileName); Review comment: I added a normalization method which only allows letters, slashes and backslashes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
flinkbot edited a comment on pull request #16108: URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884 ## CI report: * a696994e8970d98e2f77aa7a4e1db93bb0ecbe53 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27718) * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] paul8263 commented on a change in pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
paul8263 commented on a change in pull request #16108: URL: https://github.com/apache/flink/pull/16108#discussion_r764519129 ## File path: flink-core/src/main/java/org/apache/flink/util/FileLock.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.OverlappingFileLockException; +import java.nio.file.Paths; + +/** A file lock used for avoiding race condition among multiple unit test processes */ +public class FileLock { +private static final long TRY_LOCK_INTERVAL = 50; +private final File file; +private FileOutputStream outputStream; +private java.nio.channels.FileLock lock; + +public FileLock(String fileName) { +this.file = new File(fileName); +} + +private void init() throws IOException { +String filePath = +Paths.get(System.getProperty("java.io.tmpdir"), file.getName()).toString(); +File lockFile = new File(filePath); +if (!lockFile.exists()) { +lockFile.createNewFile(); +} + +outputStream = new FileOutputStream(lockFile); +} + +public void lock() throws IOException { +if (outputStream == null) { +init(); +} +try { +lock = outputStream.getChannel().lock(); +} catch (OverlappingFileLockException e) { +try { +Thread.sleep(TRY_LOCK_INTERVAL); +} catch (InterruptedException ignored) { +} +lock(); Review comment: Yes, it should be wrapped in a while clause instead of a recursive invocation. I will remove the lock method because we don't need it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] paul8263 commented on a change in pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …
paul8263 commented on a change in pull request #16108: URL: https://github.com/apache/flink/pull/16108#discussion_r764518525 ## File path: flink-core/src/main/java/org/apache/flink/util/FileLock.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.OverlappingFileLockException; +import java.nio.file.Paths; + +/** A file lock used for avoiding race condition among multiple unit test processes */ +public class FileLock { Review comment: I will add it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not t
lincoln-lil commented on a change in pull request #17939: URL: https://github.com/apache/flink/pull/17939#discussion_r764518374 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java ## @@ -232,26 +246,30 @@ private int deriveSinkParallelism( * messages. */ private Transformation applyKeyBy( -ChangelogMode changelogMode, +TableConfig config, Transformation inputTransform, int[] primaryKeys, int sinkParallelism, -boolean upsertMaterialize) { -final int inputParallelism = inputTransform.getParallelism(); -if ((inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) -&& !upsertMaterialize) { -return inputTransform; +int inputParallelism, +boolean inputInsertOnly, +boolean needMaterialize) { +boolean sameParallelism = sinkParallelism == inputParallelism; +final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk = + config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK); +boolean sinkKeyBy = false; +switch (sinkShuffleByPk) { +case NONE: +break; +case AUTO: +sinkKeyBy = inputInsertOnly && !sameParallelism; +break; +case FORCE: +// single parallelism has no problem +sinkKeyBy = !(sinkParallelism == 1 && inputParallelism == 1); +break; } -if (primaryKeys.length == 0) { -throw new TableException( -String.format( -"The sink for table '%s' has a configured parallelism of %s, while the input parallelism is %s. " -+ "Since the configured parallelism is different from the input's parallelism and " -+ "the changelog mode is not insert-only, a primary key is required but could not " -+ "be found.", - tableSinkSpec.getObjectIdentifier().asSummaryString(), -sinkParallelism, -inputParallelism)); +if (!sinkKeyBy && !needMaterialize) { +return inputTransform; } final RowDataKeySelector selector = Review comment: Good catch! Should not add a empty key selector here (cause all the data go into one parallel task) though the result is correct. I'll add a case for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17988: [FLINK-25010][Connectors/Hive] Speed up hive's createMRSplits by multi thread
flinkbot edited a comment on pull request #17988: URL: https://github.com/apache/flink/pull/17988#issuecomment-984363654 ## CI report: * 28b8d68b9819361880a3893eb021688617a2e5fb Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27401) * 92f5f7c95542c9a08863324d5c885106e554a6c2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27768) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17988: [FLINK-25010][Connectors/Hive] Speed up hive's createMRSplits by multi thread
flinkbot edited a comment on pull request #17988: URL: https://github.com/apache/flink/pull/17988#issuecomment-984363654 ## CI report: * 28b8d68b9819361880a3893eb021688617a2e5fb Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27401) * 92f5f7c95542c9a08863324d5c885106e554a6c2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17988: [FLINK-25010][Connectors/Hive] Speed up hive's createMRSplits by multi thread
flinkbot edited a comment on pull request #17988: URL: https://github.com/apache/flink/pull/17988#issuecomment-984363654 ## CI report: * 28b8d68b9819361880a3893eb021688617a2e5fb Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27401) * 8b88d70ff7a9f271d2de0dcaef43b49b310193f2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-24730) Support prompt style in SqlCli
[ https://issues.apache.org/jira/browse/FLINK-24730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yubin Li closed FLINK-24730. Resolution: Fixed > Support prompt style in SqlCli > -- > > Key: FLINK-24730 > URL: https://issues.apache.org/jira/browse/FLINK-24730 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Affects Versions: 1.15.0 >Reporter: Yubin Li >Priority: Major > > prompt is a very popular function in cli of traditional database like mysql, > help users to notice session context like current db/catalog/time, avoid > calling wrong commands in an unexpected session. > > ||arg||comment|| > |\c|current catalog| > |\d|current db| > |\t|current time| > for example, > FLINK SQL> set sql-client.prompt.style=\c~\d~\t; > catalog1~db1~2021-11-12 16:00:00> show current catalog; -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] zhuzhurk commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.
zhuzhurk commented on a change in pull request #17952: URL: https://github.com/apache/flink/pull/17952#discussion_r764514899 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismDecider.java ## @@ -29,17 +29,8 @@ /** * Computing the parallelism. * - * @param consumedResults The size of consumed blocking results. + * @param consumedResults The result info of consumed blocking results. Review comment: result info -> information? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17462: [FLINK-23170] Write metadata after materialization
flinkbot edited a comment on pull request #17462: URL: https://github.com/apache/flink/pull/17462#issuecomment-942024789 ## CI report: * 459916eaca61c9210c403ee4f4125cfbf3590fa2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27685) * ddc8a47426d41b78f448556c5b9a8bbf3ad8c19b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27767) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #17462: [FLINK-23170] Write metadata after materialization
flinkbot edited a comment on pull request #17462: URL: https://github.com/apache/flink/pull/17462#issuecomment-942024789 ## CI report: * 459916eaca61c9210c403ee4f4125cfbf3590fa2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27685) * ddc8a47426d41b78f448556c5b9a8bbf3ad8c19b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #17462: [FLINK-23170] Write metadata after materialization
curcur commented on pull request #17462: URL: https://github.com/apache/flink/pull/17462#issuecomment-988451245 squash commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not th
JingsongLi commented on a change in pull request #17939: URL: https://github.com/apache/flink/pull/17939#discussion_r764512993 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java ## @@ -232,26 +246,30 @@ private int deriveSinkParallelism( * messages. */ private Transformation applyKeyBy( -ChangelogMode changelogMode, +TableConfig config, Transformation inputTransform, int[] primaryKeys, int sinkParallelism, -boolean upsertMaterialize) { -final int inputParallelism = inputTransform.getParallelism(); -if ((inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) -&& !upsertMaterialize) { -return inputTransform; +int inputParallelism, +boolean inputInsertOnly, +boolean needMaterialize) { +boolean sameParallelism = sinkParallelism == inputParallelism; +final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk = + config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK); +boolean sinkKeyBy = false; +switch (sinkShuffleByPk) { +case NONE: +break; +case AUTO: +sinkKeyBy = inputInsertOnly && !sameParallelism; +break; +case FORCE: +// single parallelism has no problem +sinkKeyBy = !(sinkParallelism == 1 && inputParallelism == 1); +break; } -if (primaryKeys.length == 0) { -throw new TableException( -String.format( -"The sink for table '%s' has a configured parallelism of %s, while the input parallelism is %s. " -+ "Since the configured parallelism is different from the input's parallelism and " -+ "the changelog mode is not insert-only, a primary key is required but could not " -+ "be found.", - tableSinkSpec.getObjectIdentifier().asSummaryString(), -sinkParallelism, -inputParallelism)); +if (!sinkKeyBy && !needMaterialize) { +return inputTransform; } final RowDataKeySelector selector = Review comment: Do code go here when table don't have pk? This will introduce a bug? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not th
JingsongLi commented on a change in pull request #17939: URL: https://github.com/apache/flink/pull/17939#discussion_r764511680 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java ## @@ -140,6 +140,25 @@ + "or force materialization(FORCE).") .build()); +@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) +public static final ConfigOption TABLE_EXEC_SINK_SHUFFLE_BY_PK = +key("table.exec.sink.pk-shuffle") Review comment: `table.exec.sink.key-by-mode`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not th
JingsongLi commented on a change in pull request #17939: URL: https://github.com/apache/flink/pull/17939#discussion_r764511517 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java ## @@ -232,26 +246,30 @@ private int deriveSinkParallelism( * messages. */ private Transformation applyKeyBy( -ChangelogMode changelogMode, +TableConfig config, Transformation inputTransform, int[] primaryKeys, int sinkParallelism, -boolean upsertMaterialize) { -final int inputParallelism = inputTransform.getParallelism(); -if ((inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) -&& !upsertMaterialize) { -return inputTransform; +int inputParallelism, +boolean inputInsertOnly, +boolean needMaterialize) { +boolean sameParallelism = sinkParallelism == inputParallelism; +final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk = + config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK); +boolean sinkKeyBy = false; +switch (sinkShuffleByPk) { +case NONE: +break; +case AUTO: +sinkKeyBy = inputInsertOnly && !sameParallelism; +break; +case FORCE: +// single parallelism has no problem +sinkKeyBy = !(sinkParallelism == 1 && inputParallelism == 1); Review comment: `sinkParallelism != 1 || inputParallelism != 1`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not th
JingsongLi commented on a change in pull request #17939: URL: https://github.com/apache/flink/pull/17939#discussion_r764511367 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java ## @@ -232,26 +246,30 @@ private int deriveSinkParallelism( * messages. */ private Transformation applyKeyBy( -ChangelogMode changelogMode, +TableConfig config, Transformation inputTransform, int[] primaryKeys, int sinkParallelism, -boolean upsertMaterialize) { -final int inputParallelism = inputTransform.getParallelism(); -if ((inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) -&& !upsertMaterialize) { -return inputTransform; +int inputParallelism, +boolean inputInsertOnly, +boolean needMaterialize) { +boolean sameParallelism = sinkParallelism == inputParallelism; +final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk = + config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK); +boolean sinkKeyBy = false; +switch (sinkShuffleByPk) { +case NONE: +break; +case AUTO: +sinkKeyBy = inputInsertOnly && !sameParallelism; +break; +case FORCE: Review comment: ignore me. Already is my expectation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not th
JingsongLi commented on a change in pull request #17939: URL: https://github.com/apache/flink/pull/17939#discussion_r764511041 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java ## @@ -232,26 +246,30 @@ private int deriveSinkParallelism( * messages. */ private Transformation applyKeyBy( -ChangelogMode changelogMode, +TableConfig config, Transformation inputTransform, int[] primaryKeys, int sinkParallelism, -boolean upsertMaterialize) { -final int inputParallelism = inputTransform.getParallelism(); -if ((inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT)) -&& !upsertMaterialize) { -return inputTransform; +int inputParallelism, +boolean inputInsertOnly, +boolean needMaterialize) { +boolean sameParallelism = sinkParallelism == inputParallelism; +final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk = + config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK); +boolean sinkKeyBy = false; +switch (sinkShuffleByPk) { +case NONE: +break; +case AUTO: +sinkKeyBy = inputInsertOnly && !sameParallelism; Review comment: `!sameParallelism` => `sinkParallelism != inputParallelism` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org