[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-12-07 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r764623110



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayesModelData.java
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.naivebayes;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The model data of {@link NaiveBayesModel}.
+ *
+ * This class also provides methods to convert model data between Table and 
Datastream, and
+ * classes to save/load model data.
+ */
+public class NaiveBayesModelData implements Serializable {
+public final Map[][] theta;
+public final double[] piArray;
+public final int[] labels;
+
+public NaiveBayesModelData(Map[][] theta, double[] 
piArray, int[] labels) {
+this.theta = theta;
+this.piArray = piArray;
+this.labels = labels;
+}
+
+/** Converts the provided modelData Datastream into corresponding Table. */
+public static Table getModelDataTable(DataStream 
stream) {
+StreamTableEnvironment tEnv =
+
StreamTableEnvironment.create(stream.getExecutionEnvironment());
+return tEnv.fromDataStream(stream);
+}
+
+/** Converts the provided modelData Table into corresponding DataStream. */
+public static DataStream getModelDataStream(Table 
table) {
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
table).getTableEnvironment();
+return tEnv.toDataStream(table)
+.map(
+(MapFunction)
+row -> (NaiveBayesModelData) 
row.getField("f0"));
+}
+
+/** Encoder for the {@link NaiveBayesModelData}. */
+public static class ModelDataEncoder implements 
Encoder {
+@Override
+public void encode(NaiveBayesModelData modelData, OutputStream 
outputStream) {
+Output output = new Output(outputStream);
+
+output.writeInt(modelData.labels.length);

Review comment:
   OK. I'll make the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16582: [FLINK-21504][checkpoint] Introduce notification of subsumed checkpoint

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #16582:
URL: https://github.com/apache/flink/pull/16582#issuecomment-885598535


   
   ## CI report:
   
   * 4575a58b2365b9ab070606108ef6ba1b4507dc24 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27764)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the num

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17692:
URL: https://github.com/apache/flink/pull/17692#issuecomment-961748638


   
   ## CI report:
   
   * 5378879134e1c08ada5239c650d2ce77cff0f3e3 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27776)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xiangqiao123 commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.

2021-12-07 Thread GitBox


xiangqiao123 commented on a change in pull request #17952:
URL: https://github.com/apache/flink/pull/17952#discussion_r764615819



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link VertexParallelismDecider}. */
+public class DefaultVertexParallelismDecider implements 
VertexParallelismDecider {
+
+private static final Logger LOG =
+LoggerFactory.getLogger(DefaultVertexParallelismDecider.class);
+
+/**
+ * The cap ratio of broadcast bytes to data volume per task. The cap ratio 
is 0.5 currently
+ * because we usually expect the broadcast dataset to be smaller than 
non-broadcast. We can make
+ * it configurable later if we see users requesting for it.
+ */
+private static final double CAP_RATIO_OF_BROADCAST = 0.5;
+
+private final int maxParallelism;
+private final int minParallelism;
+private final long dataVolumePerTask;
+private final int defaultSourceParallelism;
+
+private DefaultVertexParallelismDecider(
+int maxParallelism,
+int minParallelism,
+MemorySize dataVolumePerTask,
+int defaultSourceParallelism) {
+
+checkArgument(minParallelism > 0, "Invalid minimum parallelism.");
+checkArgument(
+maxParallelism >= minParallelism,
+"Maximum parallelism should greater than minimum 
parallelism.");

Review comment:
   Maximum parallelism should greater or equal than minimum parallelism.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not the same

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17939:
URL: https://github.com/apache/flink/pull/17939#issuecomment-981420515


   
   ## CI report:
   
   * a124edc8f7930c1f9a7078418b2117646e464a0e Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27773)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-18165) When savingpoint is restored, select the checkpoint directory and stateBackend

2021-12-07 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-18165.

Resolution: Fixed

This ticket has been resolved by FLINK-20976

> When savingpoint is restored, select the checkpoint directory and stateBackend
> --
>
> Key: FLINK-18165
> URL: https://issues.apache.org/jira/browse/FLINK-18165
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.9.0
> Environment: flink 1.9
>Reporter: Xinyuan Liu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> If the checkpoint file is used as the initial state of the savepoint startup, 
> it must be ensured that the state backend used before and after is the same 
> type, but in actual production, there will be more and more state, the 
> taskmanager memory is insufficient and the cluster cannot be expanded, and 
> the state backend needs to be switched at this time. And there is a need to 
> ensure data consistency. Unfortunately, currently flink does not provide an 
> elegant way to switch state backend, can the community consider this proposal



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17655: [FLINK-24728][tests] Add tests to ensure SQL file sink closes all created files

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17655:
URL: https://github.com/apache/flink/pull/17655#issuecomment-958717470


   
   ## CI report:
   
   * b3867f269b1f83df6e44493def7c78a18e962a5d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26396)
 
   * 558b57cbfaf1a062c49b74682f905cc65b06f2c3 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27783)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-19008) Flink Job runs slow after restore + downscale from an incremental checkpoint (rocksdb)

2021-12-07 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-19008.

Resolution: Information Provided

Flink has bumpped RocksDB version to 6.20.3 which adopts this compaction 
priority as default. However, I don't think this change could benefit a lot for 
performance beavior. Let's see problem still existed after Flink-1.14.

> Flink Job runs slow after restore + downscale from an incremental checkpoint 
> (rocksdb)
> --
>
> Key: FLINK-19008
> URL: https://issues.apache.org/jira/browse/FLINK-19008
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Jun Qin
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> perfomance, usability
>
> A customer runs a Flink job with RocksDB state backend. Checkpoints are 
> retained and done incrementally. The state size is several TB. When they 
> restore + downscale from a retained checkpoint, although the downloading of 
> checkpoint files took ~20min, the job throughput returns to the expected 
> level only after 3 hours.  
> I do not have RocksDB logs. The suspicion for those 3 hours is due to heavy 
> RocksDB compaction and/or flush. As it was observed that checkpoint could not 
> finish faster enough due to long {{checkpoint duration (sync)}}. How can we 
> make this restoring phase shorter? 
> For compaction, I think it is worth to check the improvement of:
> {code:c}
> CompactionPri compaction_pri = kMinOverlappingRatio;{code}
> which has been set to default in RocksDB 6.x:
> {code:c}
> // In Level-based compaction, it Determines which file from a level to be
> // picked to merge to the next level. We suggest people try
> // kMinOverlappingRatio first when you tune your database.
> enum CompactionPri : char {
>   // Slightly prioritize larger files by size compensated by #deletes
>   kByCompensatedSize = 0x0,
>   // First compact files whose data's latest update time is oldest.
>   // Try this if you only update some hot keys in small ranges.
>   kOldestLargestSeqFirst = 0x1,
>   // First compact files whose range hasn't been compacted to the next level
>   // for the longest. If your updates are random across the key space,
>   // write amplification is slightly better with this option.
>   kOldestSmallestSeqFirst = 0x2,
>   // First compact files whose ratio between overlapping size in next level
>   // and its size is the smallest. It in many cases can optimize write
>   // amplification.
>   kMinOverlappingRatio = 0x3,
> };
> ...
> // Default: kMinOverlappingRatio  
> CompactionPri compaction_pri = kMinOverlappingRatio;{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #17655: [FLINK-24728][tests] Add tests to ensure SQL file sink closes all created files

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17655:
URL: https://github.com/apache/flink/pull/17655#issuecomment-958717470


   
   ## CI report:
   
   * b3867f269b1f83df6e44493def7c78a18e962a5d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26396)
 
   * 558b57cbfaf1a062c49b74682f905cc65b06f2c3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #18050:
URL: https://github.com/apache/flink/pull/18050#issuecomment-988466514


   
   ## CI report:
   
   * 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27771)
 
   * daeb5ff033438d2aa6f314a756ca9f1c5f5838b5 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27778)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-23532) Unify stop-with-savepoint w drain and w/o drain

2021-12-07 Thread Dawid Wysakowicz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-23532.

Resolution: Implemented

Implemented in 
72a2471fdb08a625cbe173ef89b53db8425a14b6..2b167ae9764c02d4c77a41f2afd056ed8fd5f04f

> Unify stop-with-savepoint w drain and w/o drain
> ---
>
> Key: FLINK-23532
> URL: https://issues.apache.org/jira/browse/FLINK-23532
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Task
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> The code paths for stop-with-savepoint  --drain and w/o drain are different 
> after FLINK-23408. We should unify the two and in both cases we should wait 
> for the savepoint in afterInvoke and close sources before triggering the last 
> checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #18014: [FLINK-24857][test][Kafka] Upgrade SourceReaderTestBase t…

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #18014:
URL: https://github.com/apache/flink/pull/18014#issuecomment-986441468


   
   ## CI report:
   
   * a00cd7df6de0a7db9dc0517a8eeb9e0cd8465a6a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27723)
 
   * d21e6dcc5ea43433e5efe0b46a8dca9c520b Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27781)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dawidwys merged pull request #17968: [FLINK-23532] Unify stop-with-savepoint without and with drain

2021-12-07 Thread GitBox


dawidwys merged pull request #17968:
URL: https://github.com/apache/flink/pull/17968


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18014: [FLINK-24857][test][Kafka] Upgrade SourceReaderTestBase t…

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #18014:
URL: https://github.com/apache/flink/pull/18014#issuecomment-986441468


   
   ## CI report:
   
   * a00cd7df6de0a7db9dc0517a8eeb9e0cd8465a6a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27723)
 
   * d21e6dcc5ea43433e5efe0b46a8dca9c520b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-20996) Using a cryptographically weak Pseudo Random Number Generator (PRNG)

2021-12-07 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17455009#comment-17455009
 ] 

Yun Tang commented on FLINK-20996:
--

[~yaxiao] AbstractTtlStateVerifier is just a class for unit test instead of 
running in production environment, and I don't have a idea why this would be 
attacked in a security context.

> Using a cryptographically weak Pseudo Random Number Generator (PRNG)
> 
>
> Key: FLINK-20996
> URL: https://issues.apache.org/jira/browse/FLINK-20996
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ya Xiao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> We are a security research team at Virginia Tech. We are doing an empirical 
> study about the usefulness of the existing security vulnerability detection 
> tools. The following is a reported vulnerability by certain tools. We'll so 
> appreciate it if you can give any feedback on it.
> *Vulnerability Description:*
> {color:#172b4d}In file 
> {color}[flink/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java,|https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java]
>  use java.util.Random instead of java.security.SecureRandom at Line 39.
> *Security Impact:*
> Java.util.Random is not cryptographically strong and may expose sensitive 
> information to certain types of attacks when used in a security context.
> *Useful Resources*:
> [https://cwe.mitre.org/data/definitions/338.html]
> *Solution we suggest:*
> Replace it with SecureRandom
> *Please share with us your opinions/comments if there is any:*
> Is the bug report helpful?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not th

2021-12-07 Thread GitBox


JingsongLi commented on a change in pull request #17939:
URL: https://github.com/apache/flink/pull/17939#discussion_r764598800



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##
@@ -150,6 +150,25 @@
 + "or force 
materialization(FORCE).")
 .build());
 
+@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+public static final ConfigOption 
TABLE_EXEC_SINK_KEYED_SHUFFLE =
+key("table.exec.sink.keyed-shuffle")
+.enumType(SinkKeyedShuffle.class)
+.defaultValue(SinkKeyedShuffle.AUTO)
+.withDescription(
+Description.builder()
+.text(
+"In order to minimize the 
distributed disorder problem when writing data into table with primary keys 
that many users suffers. "
++ "FLINK will auto add a 
keyed shuffle by default when the sink's parallelism differs from upstream 
operator and upstream is append only. "
++ "This works only when 
the upstream ensures the multi-records' order on the primary key, if not, the 
added shuffle can not solve "

Review comment:
   `the multi-records' order`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17988: [FLINK-25010][Connectors/Hive] Speed up hive's createMRSplits by multi thread

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17988:
URL: https://github.com/apache/flink/pull/17988#issuecomment-984363654


   
   ## CI report:
   
   * 92f5f7c95542c9a08863324d5c885106e554a6c2 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27768)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17462: [FLINK-23170] Write metadata after materialization

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17462:
URL: https://github.com/apache/flink/pull/17462#issuecomment-942024789


   
   ## CI report:
   
   * ddc8a47426d41b78f448556c5b9a8bbf3ad8c19b Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27767)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-21726) Fix checkpoint stuck

2021-12-07 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang closed FLINK-21726.

Fix Version/s: (was: 1.15.0)
   (was: 1.14.1)
   Resolution: Information Provided

Since we already bumpped up the rocksdb version, the fix of this problem has 
been included in that version.

> Fix checkpoint stuck
> 
>
> Key: FLINK-21726
> URL: https://issues.apache.org/jira/browse/FLINK-21726
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: future
>Priority: Minor
>  Labels: auto-deprioritized-critical, auto-deprioritized-major
>
> h1. 1. Bug description:
> When RocksDB Checkpoint, it may be stuck in 
> `WaitUntilFlushWouldNotStallWrites` method.
> h1. 2. Simple analysis of the reasons:
> h2. 2.1 Configuration parameters:
>  
> {code:java}
> # Flink yaml:
> state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
> state.backend.rocksdb.compaction.style: UNIVERSAL
> # corresponding RocksDB config
> Compaction Style : Universal 
> max_write_buffer_number : 4
> min_write_buffer_number_to_merge : 3{code}
> Checkpoint is usually very fast. When the Checkpoint is executed, 
> `WaitUntilFlushWouldNotStallWrites` is called. If there are 2 Immutable 
> MemTables, which are less than `min_write_buffer_number_to_merge`, they will 
> not be flushed. But will enter this code.
>  
> {code:java}
> // method: GetWriteStallConditionAndCause
> if (mutable_cf_options.max_write_buffer_number> 3 &&
>   num_unflushed_memtables >=
>   mutable_cf_options.max_write_buffer_number-1) {
>  return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
> }
> {code}
> code link: 
> [https://github.com/facebook/rocksdb/blob/fbed72f03c3d9e4fdca3e5993587ef2559ba6ab9/db/column_family.cc#L847]
> Checkpoint thought there was a FlushJob, but it didn't. So will always wait.
> h2. 2.2 solution:
> Increase the restriction: the `number of Immutable MemTable` >= 
> `min_write_buffer_number_to_merge will wait`.
> The rocksdb community has fixed this bug, link: 
> [https://github.com/facebook/rocksdb/pull/7921]
> h2. 2.3 Code that can reproduce the bug:
> [https://github.com/1996fanrui/fanrui-learning/blob/flink-1.12/module-java/src/main/java/com/dream/rocksdb/RocksDBCheckpointStuck.java]
> h1. 3. Interesting point
> This bug will be triggered only when `the number of sorted runs >= 
> level0_file_num_compaction_trigger`.
> Because there is a break in WaitUntilFlushWouldNotStallWrites.
> {code:java}
> if (cfd->imm()->NumNotFlushed() <
> cfd->ioptions()->min_write_buffer_number_to_merge &&
> vstorage->l0_delay_trigger_count() <
> mutable_cf_options.level0_file_num_compaction_trigger) {
>   break;
> }
> {code}
> code link: 
> [https://github.com/facebook/rocksdb/blob/fbed72f03c3d9e4fdca3e5993587ef2559ba6ab9/db/db_impl/db_impl_compaction_flush.cc#L1974]
> Universal may have `l0_delay_trigger_count() >= 
> level0_file_num_compaction_trigger`, so this bug is triggered.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-22962) Key group is not in KeyGroupRange error while checkpointing

2021-12-07 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17454991#comment-17454991
 ] 

Yun Tang commented on FLINK-22962:
--

[~prateekkohli2112] After flink-1.13, we unify the format of savepoint of 
RocksDB and heap keyed state backends. Will you still face the problem?



> Key group is not in KeyGroupRange error while checkpointing
> ---
>
> Key: FLINK-22962
> URL: https://issues.apache.org/jira/browse/FLINK-22962
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.12.1
> Environment: Linux
>Reporter: Prateek Kohli
>Priority: Major
>
> Hi,
>  
> We are getting the below exception while using rocksdb as state backend at 
> the time of checkpointing:
> 2021-06-10 12:05:13,933 INFO 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - 
> Aggregator (3/4)#0 - asynchronous part of checkpoint 2 could not be completed.
> java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: 
> Key group 0 is not in KeyGroupRange\{startKeyGroup=5, endKeyGroup=7}.
>  at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_261]
>  at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_261]
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:621)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
>  ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:122)
>  [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_261]
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_261]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in 
> KeyGroupRange\{startKeyGroup=5, endKeyGroup=7}.
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:144)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:106)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:333)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:264)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:227)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:180)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_261]
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:618)
>  ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>  ... 5 more
>  
> When we change the state backend to file or heap we do not get this error.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #18049:
URL: https://github.com/apache/flink/pull/18049#issuecomment-988465299


   
   ## CI report:
   
   * da40e9da154b7273f6db5eb12d65c983636cd1e6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27770)
 
   *  Unknown: [CANCELED](TBD) 
   * e060cdbc3f026f9d242e7d6f3d0f03496a4ffe24 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27780)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17952:
URL: https://github.com/apache/flink/pull/17952#issuecomment-982356774


   
   ## CI report:
   
   * c18948fda6752079a297579c26db0316376390bd Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27720)
 
   * d3dd231a393793dabc3a1f218981119f8e4259d7 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27779)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #18049:
URL: https://github.com/apache/flink/pull/18049#issuecomment-988465299


   
   ## CI report:
   
   * da40e9da154b7273f6db5eb12d65c983636cd1e6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27770)
 
   *  Unknown: [CANCELED](TBD) 
   * e060cdbc3f026f9d242e7d6f3d0f03496a4ffe24 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17952:
URL: https://github.com/apache/flink/pull/17952#issuecomment-982356774


   
   ## CI report:
   
   * c18948fda6752079a297579c26db0316376390bd Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27720)
 
   * d3dd231a393793dabc3a1f218981119f8e4259d7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xuyangzhong commented on pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase

2021-12-07 Thread GitBox


xuyangzhong commented on pull request #18049:
URL: https://github.com/apache/flink/pull/18049#issuecomment-988539996


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wanglijie95 commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.

2021-12-07 Thread GitBox


wanglijie95 commented on a change in pull request #17952:
URL: https://github.com/apache/flink/pull/17952#discussion_r764582435



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java
##
@@ -81,52 +88,36 @@ private int calculateParallelism(List 
consumedResults) {
 .reduce(0L, Long::sum))
 .sum();
 
-if (broadcastBytes > dataVolumePerTask
-|| (broadcastBytes == dataVolumePerTask && nonBroadcastBytes > 
0)) {
-LOG.warn(
-"The minimum size of one task to process is larger than "
-+ "the size of data volume which is configured by "
-+ "'"
-+ 
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key()
-+ "'. "
-+ "Parallelism will be set to {}.",
-maxParallelism);
-
-return maxParallelism;
-} else if (broadcastBytes == dataVolumePerTask) {
-return minParallelism;
-} else {
-int parallelism =
-(int)
-Math.ceil(
-(double) nonBroadcastBytes
-/ (dataVolumePerTask - 
broadcastBytes));
-parallelism = Math.max(parallelism, minParallelism);
-parallelism = Math.min(parallelism, maxParallelism);
-return parallelism;
-}
-}
-
-/** The factory to instantiate {@link DefaultVertexParallelismDecider}. */
-public static class Factory implements VertexParallelismDecider.Factory {
+long expectedMaxBroadcastBytes =
+(long) Math.ceil((dataVolumePerTask * CAP_RATIO_OF_BROADCAST));
 
-private final Configuration configuration;
+if (broadcastBytes > expectedMaxBroadcastBytes) {
+LOG.info(
+"The number of broadcast bytes: {} is larger than the 
expected maximum value: {} ('{}' * {})."
++ " Use the expected maximum value as the number 
of broadcast bytes to decide the parallelism.",
+broadcastBytes,
+expectedMaxBroadcastBytes,
+
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(),
+CAP_RATIO_OF_BROADCAST);
 
-public Factory(Configuration configuration) {
-this.configuration = configuration;
+broadcastBytes = expectedMaxBroadcastBytes;
 }
 
-@Override
-public VertexParallelismDecider create() {
-return new DefaultVertexParallelismDecider(
-configuration.getInteger(
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM),
-configuration.getInteger(
-
JobManagerOptions.ADAPTIVE_BACH_SCHEDULER_MIN_PARALLELISM),
-configuration.get(
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK),
-configuration.get(
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DEFAULT_SOURCE_PARALLELISM));
-}
+int parallelism =
+(int) Math.ceil((double) nonBroadcastBytes / 
(dataVolumePerTask - broadcastBytes));
+parallelism = Math.max(parallelism, minParallelism);
+parallelism = Math.min(parallelism, maxParallelism);
+return parallelism;
+}
+
+public static DefaultVertexParallelismDecider from(Configuration 
configuration) {
+return new DefaultVertexParallelismDecider(

Review comment:
   Yes, you are right. I will add the checks in ctor. The  
`dataVolumePerTask` is `MemorySize` type, I will check it's not null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-25132) KafkaSource cannot work with object-reusing DeserializationSchema

2021-12-07 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17454980#comment-17454980
 ] 

Qingsheng Ren commented on FLINK-25132:
---

[~mason6345] I think so. We'll make a back-port on 1.13 after fix this on master

> KafkaSource cannot work with object-reusing DeserializationSchema
> -
>
> Key: FLINK-25132
> URL: https://issues.apache.org/jira/browse/FLINK-25132
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.15.0, 1.14.1
>Reporter: Qingsheng Ren
>Assignee: Qingsheng Ren
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> Currently Kafka source deserializes ConsumerRecords in split reader and puts 
> them into the elementQueue, then task's main thread polls these records from 
> the queue asynchronously. This mechanism cannot cooperate with 
> DeserializationSchemas with object reuse: all records staying in the element 
> queue points to the same object.
> A solution would be moving deserialization to RecordEmitter, which works in 
> the task's main thread. 
> Notes that this issue actually effects all sources which do deserialization 
> in split reader. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] MyLanPangzi commented on a change in pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


MyLanPangzi commented on a change in pull request #14376:
URL: https://github.com/apache/flink/pull/14376#discussion_r764575993



##
File path: 
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatUtils.java
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.google.protobuf.Descriptors;
+import org.apache.commons.lang3.StringUtils;
+
+/** Protobuf function util. */
+public class PbFormatUtils {
+
+/**
+ * protobuf code has a bug that, f_abc_7d will be convert to fAbc7d in 
{@code
+ * com.google.protobuf.Descriptors.FileDescriptor.getJsonName()}, but 
actually we need fAbc7D.
+ */
+public static String fieldNameToJsonName(String name) {
+final int length = name.length();
+StringBuilder result = new StringBuilder(length);
+boolean isNextUpperCase = false;
+for (int i = 0; i < length; i++) {
+char ch = name.charAt(i);
+if (ch == '_') {
+isNextUpperCase = true;
+} else if (isNextUpperCase) {
+if ('a' <= ch && ch <= 'z') {
+ch = (char) (ch - 'a' + 'A');
+isNextUpperCase = false;
+}
+result.append(ch);
+} else {
+result.append(ch);
+}
+}
+return result.toString();
+}
+
+private static String getJavaPackageFromProtoFile(Descriptors.Descriptor 
descriptor) {
+boolean hasJavaPackage = 
descriptor.getFile().getOptions().hasJavaPackage();
+if (hasJavaPackage) {
+String javaPackage = 
descriptor.getFile().getOptions().getJavaPackage();
+if (StringUtils.isBlank(javaPackage)) {
+throw new FlinkRuntimeException("java_package cannot be blank 
string");
+}
+return javaPackage;
+} else {
+String packageName = descriptor.getFile().getPackage();
+if (StringUtils.isBlank(packageName)) {
+throw new FlinkRuntimeException("package and java_package 
cannot both be empty");
+}
+return packageName;
+}
+}
+
+public static String getFullJavaName(Descriptors.Descriptor descriptor) {
+String javaPackageName = getJavaPackageFromProtoFile(descriptor);
+if (descriptor.getFile().getOptions().getJavaMultipleFiles()) {
+// multiple_files=true
+if (null != descriptor.getContainingType()) {
+// nested type
+String parentJavaFullName = 
getFullJavaName(descriptor.getContainingType());
+return parentJavaFullName + "." + descriptor.getName();
+} else {
+// top level message
+return javaPackageName + "." + descriptor.getName();
+}
+} else {
+// multiple_files=false
+if (null != descriptor.getContainingType()) {
+// nested type
+String parentJavaFullName = 
getFullJavaName(descriptor.getContainingType());
+return parentJavaFullName + "." + descriptor.getName();
+} else {
+// top level message
+if 
(!descriptor.getFile().getOptions().hasJavaOuterClassname()) {
+// user do not define outer class name in proto file
+return javaPackageName

Review comment:
   @maosuhan 
   1.I find some bugs. When i don't declare **java_outer_classname=XXX** or 
**java_multiple_files=true** option in file .proto(proto2)  this will be 
generated x.y.z.xxxOuterClass.MyPBClass. It is incorrect.
   
   generated code: public static RowData decode(data.LogOuterClass.Log message){
   my pb code data.Data.Log
   
   2.How to identify inner class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log 

[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #16108:
URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884


   
   ## CI report:
   
   * d7d66d77b7ac632cee04bfe17c7671e048535fc1 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-23391) KafkaSourceReaderTest.testKafkaSourceMetrics fails on azure

2021-12-07 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17454978#comment-17454978
 ] 

Qingsheng Ren commented on FLINK-23391:
---

This one is quite similar to FLINK-20928, probably caused by the flaky 
{{NetworkFailureProxy}} between Flink and Kafka. I'll try to add a retry first 
and make a refactor on the current Kafka test infra after removing the legacy 
{{FlinkKafkaConsumer}} and {{{}FlinkKafkaProducer{}}}.

> KafkaSourceReaderTest.testKafkaSourceMetrics fails on azure
> ---
>
> Key: FLINK-23391
> URL: https://issues.apache.org/jira/browse/FLINK-23391
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.13.1
>Reporter: Xintong Song
>Assignee: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20456=logs=c5612577-f1f7-5977-6ff6-7432788526f7=53f6305f-55e6-561c-8f1e-3a1dde2c77df=6783
> {code}
> Jul 14 23:00:26 [ERROR] Tests run: 10, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 99.93 s <<< FAILURE! - in 
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReaderTest
> Jul 14 23:00:26 [ERROR] 
> testKafkaSourceMetrics(org.apache.flink.connector.kafka.source.reader.KafkaSourceReaderTest)
>   Time elapsed: 60.225 s  <<< ERROR!
> Jul 14 23:00:26 java.util.concurrent.TimeoutException: Offsets are not 
> committed successfully. Dangling offsets: 
> {15213={KafkaSourceReaderTest-0=OffsetAndMetadata{offset=10, 
> leaderEpoch=null, metadata=''}}}
> Jul 14 23:00:26   at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:210)
> Jul 14 23:00:26   at 
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReaderTest.testKafkaSourceMetrics(KafkaSourceReaderTest.java:275)
> Jul 14 23:00:26   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jul 14 23:00:26   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jul 14 23:00:26   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jul 14 23:00:26   at java.lang.reflect.Method.invoke(Method.java:498)
> Jul 14 23:00:26   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> Jul 14 23:00:26   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jul 14 23:00:26   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> Jul 14 23:00:26   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jul 14 23:00:26   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jul 14 23:00:26   at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
> Jul 14 23:00:26   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jul 14 23:00:26   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> Jul 14 23:00:26   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Jul 14 23:00:26   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> Jul 14 23:00:26   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> Jul 14 23:00:26   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> Jul 14 23:00:26   at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> Jul 14 23:00:26   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> Jul 14 23:00:26   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> Jul 14 23:00:26   at 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> Jul 14 23:00:26   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> Jul 14 23:00:26   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Jul 14 23:00:26   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Jul 14 23:00:26   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> Jul 14 23:00:26   at org.junit.runners.Suite.runChild(Suite.java:128)
> Jul 14 23:00:26   at org.junit.runners.Suite.runChild(Suite.java:27)
> Jul 14 23:00:26   at 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> Jul 14 23:00:26   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> Jul 14 23:00:26   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> Jul 

[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #32: [FLINK-24817] Add Naive Bayes implementation

2021-12-07 Thread GitBox


yunfengzhou-hub commented on a change in pull request #32:
URL: https://github.com/apache/flink-ml/pull/32#discussion_r764572915



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/naivebayes/NaiveBayesModel.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.naivebayes;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** A Model which classifies data using the model data computed by {@link 
NaiveBayes}. */
+public class NaiveBayesModel
+implements Model, 
NaiveBayesModelParams {
+private final Map, Object> paramMap = new HashMap<>();
+private Table modelDataTable;
+
+public NaiveBayesModel() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public Table[] transform(Table... inputs) {
+Preconditions.checkArgument(inputs.length == 1);
+
+final String predictionCol = getPredictionCol();
+final String featuresCol = getFeaturesCol();
+final String broadcastModelKey = "NaiveBayesModelStream";
+
+RowTypeInfo inputTypeInfo = 
TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+RowTypeInfo outputTypeInfo =
+new RowTypeInfo(
+ArrayUtils.addAll(
+inputTypeInfo.getFieldTypes(), 
TypeInformation.of(Integer.class)),
+ArrayUtils.addAll(inputTypeInfo.getFieldNames(), 
predictionCol));
+
+StreamTableEnvironment tEnv =
+(StreamTableEnvironment) ((TableImpl) 
modelDataTable).getTableEnvironment();
+DataStream modelDataStream =
+NaiveBayesModelData.getModelDataStream(modelDataTable);
+DataStream input = tEnv.toDataStream(inputs[0]);
+
+Map> broadcastMap = new HashMap<>();
+broadcastMap.put(broadcastModelKey, modelDataStream);
+
+Function>, DataStream> function =
+dataStreams -> {
+DataStream stream = dataStreams.get(0);
+return stream.transform(
+this.getClass().getSimpleName(),
+outputTypeInfo,
+new 

[GitHub] [flink] flinkbot edited a comment on pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #18050:
URL: https://github.com/apache/flink/pull/18050#issuecomment-988466514


   
   ## CI report:
   
   * 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27771)
 
   * daeb5ff033438d2aa6f314a756ca9f1c5f5838b5 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27778)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #18050:
URL: https://github.com/apache/flink/pull/18050#issuecomment-988466514


   
   ## CI report:
   
   * 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27771)
 
   * daeb5ff033438d2aa6f314a756ca9f1c5f5838b5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] deadwind4 commented on pull request #18033: [FLINK-25141][connector/elasticsearch] add sink parallelism option

2021-12-07 Thread GitBox


deadwind4 commented on pull request #18033:
URL: https://github.com/apache/flink/pull/18033#issuecomment-988524457


   @fapaul please review it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #16108:
URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884


   
   ## CI report:
   
   * 850a2914588ba4bf362f5b800a053708c32bac22 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27774)
 
   * d7d66d77b7ac632cee04bfe17c7671e048535fc1 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #16108:
URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884


   
   ## CI report:
   
   * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27769)
 
   * 850a2914588ba4bf362f5b800a053708c32bac22 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27774)
 
   * d7d66d77b7ac632cee04bfe17c7671e048535fc1 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #16108:
URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884


   
   ## CI report:
   
   * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27769)
 
   * 850a2914588ba4bf362f5b800a053708c32bac22 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27774)
 
   * d7d66d77b7ac632cee04bfe17c7671e048535fc1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] paul8263 commented on a change in pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


paul8263 commented on a change in pull request #16108:
URL: https://github.com/apache/flink/pull/16108#discussion_r764562387



##
File path: flink-core/src/main/java/org/apache/flink/util/FileLock.java
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.Paths;
+
+/** A file lock used for avoiding race condition among multiple unit test 
processes */
+public class FileLock {
+private static final long TRY_LOCK_INTERVAL = 50;
+private final File file;
+private FileOutputStream outputStream;
+private java.nio.channels.FileLock lock;
+
+public FileLock(String fileName) {
+this.file = new File(fileName);
+}
+
+private void init() throws IOException {
+String filePath =
+Paths.get(System.getProperty("java.io.tmpdir"), 
file.getName()).toString();
+File lockFile = new File(filePath);
+if (!lockFile.exists()) {
+lockFile.createNewFile();
+}
+
+outputStream = new FileOutputStream(lockFile);
+}

Review comment:
   Hi @AHeise ,
   `IOException` might not be thrown if the file already exists. We would get 
this exception if the parent directory of the file has not been created.
   Sorry that I messed up the code while I was testing the snipped somewhere 
else. There is no need to create another `lockFile`.
   I reconsidered the `tryLock` issue. I think the lock releasing/destroying 
should be outside `tryLock` and explicitly called even if failed to acquire the 
lock.
   Correct me if I am wrong.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27772)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the num

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17692:
URL: https://github.com/apache/flink/pull/17692#issuecomment-961748638


   
   ## CI report:
   
   * 5ee2449e410094eec0c7523a4dfee58091668398 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27678)
 
   * 5378879134e1c08ada5239c650d2ce77cff0f3e3 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27776)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17692: [FLINK-24489][CEP] The size of entryCache & eventsBufferCache in the SharedBuffer should be defined with a threshold to limit the num

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17692:
URL: https://github.com/apache/flink/pull/17692#issuecomment-961748638


   
   ## CI report:
   
   * 5ee2449e410094eec0c7523a4dfee58091668398 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27678)
 
   * 5378879134e1c08ada5239c650d2ce77cff0f3e3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #16108:
URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884


   
   ## CI report:
   
   * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27769)
 
   * 850a2914588ba4bf362f5b800a053708c32bac22 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27774)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18048: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #18048:
URL: https://github.com/apache/flink/pull/18048#issuecomment-988409163


   
   ## CI report:
   
   * 03c879c08980eedf2bba6b46a6e5eeb0a0df31b8 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27763)
 
   * c10e864f5d0434074c43910f6eef5d2ae1aba793 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27775)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18048: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #18048:
URL: https://github.com/apache/flink/pull/18048#issuecomment-988409163


   
   ## CI report:
   
   * 03c879c08980eedf2bba6b46a6e5eeb0a0df31b8 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27763)
 
   * c10e864f5d0434074c43910f6eef5d2ae1aba793 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18048: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #18048:
URL: https://github.com/apache/flink/pull/18048#issuecomment-988409163


   
   ## CI report:
   
   * 03c879c08980eedf2bba6b46a6e5eeb0a0df31b8 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27763)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #16108:
URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884


   
   ## CI report:
   
   * a696994e8970d98e2f77aa7a4e1db93bb0ecbe53 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27718)
 
   * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27769)
 
   * 850a2914588ba4bf362f5b800a053708c32bac22 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27774)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #16108:
URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884


   
   ## CI report:
   
   * a696994e8970d98e2f77aa7a4e1db93bb0ecbe53 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27718)
 
   * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27769)
 
   * 850a2914588ba4bf362f5b800a053708c32bac22 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not the same

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17939:
URL: https://github.com/apache/flink/pull/17939#issuecomment-981420515


   
   ## CI report:
   
   * 9ccb311aec0b5f0d14197c5de37a171ea15c6097 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27244)
 
   * a124edc8f7930c1f9a7078418b2117646e464a0e Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27773)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 888452faa35e98b1151416484edfcf9e097f680f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741)
 
   * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27772)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not the same

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17939:
URL: https://github.com/apache/flink/pull/17939#issuecomment-981420515


   
   ## CI report:
   
   * 9ccb311aec0b5f0d14197c5de37a171ea15c6097 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27244)
 
   * a124edc8f7930c1f9a7078418b2117646e464a0e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lincoln-lil commented on pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not the same for

2021-12-07 Thread GitBox


lincoln-lil commented on pull request #17939:
URL: https://github.com/apache/flink/pull/17939#issuecomment-988482409


   Thanks for your review! @JingsongLi   
   I've address your comments and update the pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 888452faa35e98b1151416484edfcf9e097f680f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741)
 
   * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 888452faa35e98b1151416484edfcf9e097f680f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 888452faa35e98b1151416484edfcf9e097f680f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741)
 
   * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.

2021-12-07 Thread GitBox


zhuzhurk commented on a change in pull request #17952:
URL: https://github.com/apache/flink/pull/17952#discussion_r764533772



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java
##
@@ -81,52 +88,36 @@ private int calculateParallelism(List 
consumedResults) {
 .reduce(0L, Long::sum))
 .sum();
 
-if (broadcastBytes > dataVolumePerTask
-|| (broadcastBytes == dataVolumePerTask && nonBroadcastBytes > 
0)) {
-LOG.warn(
-"The minimum size of one task to process is larger than "
-+ "the size of data volume which is configured by "
-+ "'"
-+ 
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key()
-+ "'. "
-+ "Parallelism will be set to {}.",
-maxParallelism);
-
-return maxParallelism;
-} else if (broadcastBytes == dataVolumePerTask) {
-return minParallelism;
-} else {
-int parallelism =
-(int)
-Math.ceil(
-(double) nonBroadcastBytes
-/ (dataVolumePerTask - 
broadcastBytes));
-parallelism = Math.max(parallelism, minParallelism);
-parallelism = Math.min(parallelism, maxParallelism);
-return parallelism;
-}
-}
-
-/** The factory to instantiate {@link DefaultVertexParallelismDecider}. */
-public static class Factory implements VertexParallelismDecider.Factory {
+long expectedMaxBroadcastBytes =
+(long) Math.ceil((dataVolumePerTask * CAP_RATIO_OF_BROADCAST));
 
-private final Configuration configuration;
+if (broadcastBytes > expectedMaxBroadcastBytes) {
+LOG.info(
+"The number of broadcast bytes: {} is larger than the 
expected maximum value: {} ('{}' * {})."
++ " Use the expected maximum value as the number 
of broadcast bytes to decide the parallelism.",
+broadcastBytes,
+expectedMaxBroadcastBytes,
+
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(),
+CAP_RATIO_OF_BROADCAST);
 
-public Factory(Configuration configuration) {
-this.configuration = configuration;
+broadcastBytes = expectedMaxBroadcastBytes;
 }
 
-@Override
-public VertexParallelismDecider create() {
-return new DefaultVertexParallelismDecider(
-configuration.getInteger(
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM),
-configuration.getInteger(
-
JobManagerOptions.ADAPTIVE_BACH_SCHEDULER_MIN_PARALLELISM),
-configuration.get(
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK),
-configuration.get(
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DEFAULT_SOURCE_PARALLELISM));
-}
+int parallelism =
+(int) Math.ceil((double) nonBroadcastBytes / 
(dataVolumePerTask - broadcastBytes));
+parallelism = Math.max(parallelism, minParallelism);
+parallelism = Math.min(parallelism, maxParallelism);
+return parallelism;
+}
+
+public static DefaultVertexParallelismDecider from(Configuration 
configuration) {
+return new DefaultVertexParallelismDecider(

Review comment:
   I think we need to do some checks about the params, like minParallelism> 
0, minParallelism < maxParallelism, dataVolumePerTask > 0, 
defaultSourceParallelism > 0.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 888452faa35e98b1151416484edfcf9e097f680f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 888452faa35e98b1151416484edfcf9e097f680f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741)
 
   * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-25218) Performance issues with lookup join accessing external dimension tables

2021-12-07 Thread ZhuoYu Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZhuoYu Chen updated FLINK-25218:

Affects Version/s: 1.15.0

>  Performance issues with lookup join accessing external dimension tables
> 
>
> Key: FLINK-25218
> URL: https://issues.apache.org/jira/browse/FLINK-25218
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.15.0
>Reporter: ZhuoYu Chen
>Priority: Major
>
> Current lookup join: for each input data, access the external dimension table 
> to get the result and output a data
> Implement a lookup join that can improve performance by batching and delaying



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25218) Performance issues with lookup join accessing external dimension tables

2021-12-07 Thread ZhuoYu Chen (Jira)
ZhuoYu Chen created FLINK-25218:
---

 Summary:  Performance issues with lookup join accessing external 
dimension tables
 Key: FLINK-25218
 URL: https://issues.apache.org/jira/browse/FLINK-25218
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: ZhuoYu Chen


Current lookup join: for each input data, access the external dimension table 
to get the result and output a data

Implement a lookup join that can improve performance by batching and delaying



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 888452faa35e98b1151416484edfcf9e097f680f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 888452faa35e98b1151416484edfcf9e097f680f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741)
 
   * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] maosuhan commented on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


maosuhan commented on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-988473669


   @MyLanPangzi I have updated the code to adaptively convert between enum of 
protobuf and numeric/string type of flink and also unit tests are added.
   Since codegen in java is a little tricky and hard to read, I have converted 
source files containing codegen logic to scala files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 888452faa35e98b1151416484edfcf9e097f680f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lincoln-lil commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not t

2021-12-07 Thread GitBox


lincoln-lil commented on a change in pull request #17939:
URL: https://github.com/apache/flink/pull/17939#discussion_r764528915



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##
@@ -140,6 +140,25 @@
 + "or force 
materialization(FORCE).")
 .build());
 
+@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+public static final ConfigOption 
TABLE_EXEC_SINK_SHUFFLE_BY_PK =
+key("table.exec.sink.pk-shuffle")

Review comment:
   I prefer 'keyed-shuffle',  what do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14376: [FLINK-18202][PB format] New Format of protobuf

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-744252765


   
   ## CI report:
   
   * a721a9d5f663117a203628547a19b33b23927ac2 UNKNOWN
   * 8f1051527a46aeba13aadc2db3c5f34d8aff78d2 UNKNOWN
   * 888452faa35e98b1151416484edfcf9e097f680f Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27741)
 
   * 5cf0fa22d2ad2e24c9b4c088097999f2eff8f106 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lincoln-lil commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not t

2021-12-07 Thread GitBox


lincoln-lil commented on a change in pull request #17939:
URL: https://github.com/apache/flink/pull/17939#discussion_r764527157



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##
@@ -140,6 +140,25 @@
 + "or force 
materialization(FORCE).")
 .build());
 
+@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+public static final ConfigOption 
TABLE_EXEC_SINK_SHUFFLE_BY_PK =
+key("table.exec.sink.pk-shuffle")

Review comment:
   I've considered several candidates but no most satisfied one.  
'key-by-mode' is little subtle, what about 'key-by-pk' ? 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #18050:
URL: https://github.com/apache/flink/pull/18050#issuecomment-988466514


   
   ## CI report:
   
   * 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27771)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #18049:
URL: https://github.com/apache/flink/pull/18049#issuecomment-988465299


   
   ## CI report:
   
   * da40e9da154b7273f6db5eb12d65c983636cd1e6 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27770)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition

2021-12-07 Thread GitBox


flinkbot commented on pull request #18050:
URL: https://github.com/apache/flink/pull/18050#issuecomment-988466514


   
   ## CI report:
   
   * 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase

2021-12-07 Thread GitBox


flinkbot commented on pull request #18049:
URL: https://github.com/apache/flink/pull/18049#issuecomment-988465918


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit da40e9da154b7273f6db5eb12d65c983636cd1e6 (Wed Dec 08 
03:29:41 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition

2021-12-07 Thread GitBox


flinkbot commented on pull request #18050:
URL: https://github.com/apache/flink/pull/18050#issuecomment-988465904


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 5e36ba1eacf427ca4665de18b6f71253fb53f0e8 (Wed Dec 08 
03:29:39 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.

2021-12-07 Thread GitBox


zhuzhurk commented on a change in pull request #17952:
URL: https://github.com/apache/flink/pull/17952#discussion_r764522853



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java
##
@@ -58,22 +72,22 @@ public void testSourceJobVertex() {
 }
 
 @Test
-public void testBroadcastBytesGreaterThanPerTaskBytes() {
-BlockingResultInfo resultInfo1 =
-new BlockingResultInfo(Arrays.asList(BYTE_512_MB, BYTE_1_GB), 
true);
-BlockingResultInfo resultInfo2 = new 
BlockingResultInfo(Arrays.asList(BYTE_1_GB), false);
+public void testDecideParallelism() {
+BlockingResultInfo resultInfo1 = new 
BlockingResultInfo(Arrays.asList(BYTE_256_MB), true);
+BlockingResultInfo resultInfo2 =
+new BlockingResultInfo(Arrays.asList(BYTE_256_MB, BYTE_8_GB), 
false);
 
 int parallelism =
 decider.decideParallelismForVertex(Arrays.asList(resultInfo1, 
resultInfo2));
 
-assertThat(parallelism, is(MAX_PARALLELISM));
+assertThat(parallelism, is(11));
 }
 
 @Test
-public void 
testBroadcastBytesEqualToPerTaskBytesAndHaveNonBroadcastBytes() {
-BlockingResultInfo resultInfo1 =
-new BlockingResultInfo(Arrays.asList(BYTE_512_MB, 
BYTE_512_MB), true);
-BlockingResultInfo resultInfo2 = new 
BlockingResultInfo(Arrays.asList(BYTE_512_MB), false);
+public void testDecidedParallelismIsMaxParallelism() {

Review comment:
   maybe testInitiallyDecidedParallelismIsLargerThanMaxParallelism?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.

2021-12-07 Thread GitBox


zhuzhurk commented on a change in pull request #17952:
URL: https://github.com/apache/flink/pull/17952#discussion_r764522853



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java
##
@@ -58,22 +72,22 @@ public void testSourceJobVertex() {
 }
 
 @Test
-public void testBroadcastBytesGreaterThanPerTaskBytes() {
-BlockingResultInfo resultInfo1 =
-new BlockingResultInfo(Arrays.asList(BYTE_512_MB, BYTE_1_GB), 
true);
-BlockingResultInfo resultInfo2 = new 
BlockingResultInfo(Arrays.asList(BYTE_1_GB), false);
+public void testDecideParallelism() {
+BlockingResultInfo resultInfo1 = new 
BlockingResultInfo(Arrays.asList(BYTE_256_MB), true);
+BlockingResultInfo resultInfo2 =
+new BlockingResultInfo(Arrays.asList(BYTE_256_MB, BYTE_8_GB), 
false);
 
 int parallelism =
 decider.decideParallelismForVertex(Arrays.asList(resultInfo1, 
resultInfo2));
 
-assertThat(parallelism, is(MAX_PARALLELISM));
+assertThat(parallelism, is(11));
 }
 
 @Test
-public void 
testBroadcastBytesEqualToPerTaskBytesAndHaveNonBroadcastBytes() {
-BlockingResultInfo resultInfo1 =
-new BlockingResultInfo(Arrays.asList(BYTE_512_MB, 
BYTE_512_MB), true);
-BlockingResultInfo resultInfo2 = new 
BlockingResultInfo(Arrays.asList(BYTE_512_MB), false);
+public void testDecidedParallelismIsMaxParallelism() {

Review comment:
   maybe testInitiallyDecidedParallelismIsLargerThanMinParallelism?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase

2021-12-07 Thread GitBox


flinkbot commented on pull request #18049:
URL: https://github.com/apache/flink/pull/18049#issuecomment-988465299


   
   ## CI report:
   
   * da40e9da154b7273f6db5eb12d65c983636cd1e6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lincoln-lil commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not t

2021-12-07 Thread GitBox


lincoln-lil commented on a change in pull request #17939:
URL: https://github.com/apache/flink/pull/17939#discussion_r764522531



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##
@@ -232,26 +246,30 @@ private int deriveSinkParallelism(
  * messages.
  */
 private Transformation applyKeyBy(
-ChangelogMode changelogMode,
+TableConfig config,
 Transformation inputTransform,
 int[] primaryKeys,
 int sinkParallelism,
-boolean upsertMaterialize) {
-final int inputParallelism = inputTransform.getParallelism();
-if ((inputParallelism == sinkParallelism || 
changelogMode.containsOnly(RowKind.INSERT))
-&& !upsertMaterialize) {
-return inputTransform;
+int inputParallelism,
+boolean inputInsertOnly,
+boolean needMaterialize) {
+boolean sameParallelism = sinkParallelism == inputParallelism;
+final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk =
+
config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK);
+boolean sinkKeyBy = false;
+switch (sinkShuffleByPk) {
+case NONE:
+break;
+case AUTO:
+sinkKeyBy = inputInsertOnly && !sameParallelism;
+break;
+case FORCE:

Review comment:
   The only exception is single parallelism. All other circumstances will 
force adding a keyby




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-25034) Support flexible number of subpartitions in IntermediateResultPartition

2021-12-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-25034:
---
Labels: pull-request-available  (was: )

> Support flexible number of subpartitions in IntermediateResultPartition
> ---
>
> Key: FLINK-25034
> URL: https://issues.apache.org/jira/browse/FLINK-25034
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Lijie Wang
>Assignee: Lijie Wang
>Priority: Major
>  Labels: pull-request-available
>
> Currently, when a task is deployed, it needs to know the parallelism of its 
> consumer job vertex. This is because the consumer vertex parallelism is 
> needed to decide the _numberOfSubpartitions_ of _PartitionDescriptor_ which 
> is part of the {_}ResultPartitionDeploymentDescriptor{_}. The reason behind 
> that is, at the moment, for one result partition, different subpartitions 
> serve different consumer execution vertices. More specifically, one consumer 
> execution vertex only consumes data from subpartition with the same index. 
> Considering a dynamic graph, the parallelism of a job vertex may not have 
> been decided when its upstream vertices are deployed. To enable Flink to work 
> in this case, we need a way to allow an execution vertex to run without 
> knowing the parallelism of its consumer job vertices. One basic idea is to 
> enable multiple subpartitions in one result partition to serve the same 
> consumer execution vertex.
> To achieve this goal, we can set the number of subpartitions to be the *max 
> parallelism* of the consumer job vertex. When the consumer vertex is 
> deployed, it should be assigned with a subpartition range to consume.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] wanglijie95 opened a new pull request #18050: [FLINK-25034][runtime] Support flexible number of subpartitions in IntermediateResultPartition

2021-12-07 Thread GitBox


wanglijie95 opened a new pull request #18050:
URL: https://github.com/apache/flink/pull/18050


   ## What is the purpose of the change
   
   Support flexible number of subpartitions in IntermediateResultPartition. 
When the consumer parallelism is decided,  number of subpartitions should be 
the **parallelism** of consumer job vertex(all to all). When the consumer 
parallelism is not decided, number of subpartitions should be the **max 
parallelism** of consumer job vertex.
   
   ## Brief change log
5e36ba1eacf427ca4665de18b6f71253fb53f0e8 Support flexible number of 
subpartitions in IntermediateResultPartition.
   
   
   ## Verifying this change
   Unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**no**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no**)
 - If yes, how is the feature documented? (**not applicable**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.

2021-12-07 Thread GitBox


zhuzhurk commented on a change in pull request #17952:
URL: https://github.com/apache/flink/pull/17952#discussion_r764522054



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDeciderTest.java
##
@@ -82,32 +96,32 @@ public void 
testBroadcastBytesEqualToPerTaskBytesAndHaveNonBroadcastBytes() {
 }
 
 @Test
-public void testBroadcastBytesEqualToPerTaskBytesAndNoNonBroadcastBytes() {
-BlockingResultInfo resultInfo =
-new BlockingResultInfo(Arrays.asList(BYTE_512_MB, 
BYTE_512_MB), true);
+public void testDecidedParallelismIsMinParallelism() {

Review comment:
   maybe testInitiallyDecidedParallelismIsSmallerThanMinParallelism?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xuyangzhong opened a new pull request #18049: [FLINK-24813][table-planner-blink] Improve ImplicitTypeConversionITCase

2021-12-07 Thread GitBox


xuyangzhong opened a new pull request #18049:
URL: https://github.com/apache/flink/pull/18049


   ## What is the purpose of the change
   
   delete the test file ImplicitTypeConversionITCase and union them into the 
ImplicitConversionEqualsFunctionITCase to have the same code style with 
CastFunctionITCase
   
   ## Brief change log
   
 - delete the unused codes
 - union the test cases to the current test cases
   
   ## Verifying this change
   
   Test cases have been added to verify this change.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #16108:
URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884


   
   ## CI report:
   
   * a696994e8970d98e2f77aa7a4e1db93bb0ecbe53 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27718)
 
   * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27769)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.

2021-12-07 Thread GitBox


zhuzhurk commented on a change in pull request #17952:
URL: https://github.com/apache/flink/pull/17952#discussion_r764519958



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import java.util.List;
+
+/** The blocking result info, which will be used to calculate the vertex 
parallelism. */
+public class BlockingResultInfo {
+
+private final List blockingPartitionSizes;
+
+private final boolean isBroadcast;
+
+public BlockingResultInfo(List blockingPartitionSizes, boolean 
isBroadcast) {

Review comment:
   I prefer to introduce 2 factory methods to make the flag easy to 
understand, and then make this ctor private. Maybe 
`createFromBroadcastResult()` and create `fromNonBroadcastResult()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lincoln-lil commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not t

2021-12-07 Thread GitBox


lincoln-lil commented on a change in pull request #17939:
URL: https://github.com/apache/flink/pull/17939#discussion_r764519877



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##
@@ -232,26 +246,30 @@ private int deriveSinkParallelism(
  * messages.
  */
 private Transformation applyKeyBy(
-ChangelogMode changelogMode,
+TableConfig config,
 Transformation inputTransform,
 int[] primaryKeys,
 int sinkParallelism,
-boolean upsertMaterialize) {
-final int inputParallelism = inputTransform.getParallelism();
-if ((inputParallelism == sinkParallelism || 
changelogMode.containsOnly(RowKind.INSERT))
-&& !upsertMaterialize) {
-return inputTransform;
+int inputParallelism,
+boolean inputInsertOnly,
+boolean needMaterialize) {
+boolean sameParallelism = sinkParallelism == inputParallelism;
+final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk =
+
config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK);
+boolean sinkKeyBy = false;
+switch (sinkShuffleByPk) {
+case NONE:
+break;
+case AUTO:
+sinkKeyBy = inputInsertOnly && !sameParallelism;
+break;
+case FORCE:
+// single parallelism has no problem
+sinkKeyBy = !(sinkParallelism == 1 && inputParallelism == 1);

Review comment:
   I was thought this highlighted the single parallelism exception, but now 
feels like there's no difference between the two version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] paul8263 commented on a change in pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


paul8263 commented on a change in pull request #16108:
URL: https://github.com/apache/flink/pull/16108#discussion_r764519555



##
File path: flink-core/src/main/java/org/apache/flink/util/FileLock.java
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.Paths;
+
+/** A file lock used for avoiding race condition among multiple unit test 
processes */
+public class FileLock {
+private static final long TRY_LOCK_INTERVAL = 50;
+private final File file;
+private FileOutputStream outputStream;
+private java.nio.channels.FileLock lock;
+
+public FileLock(String fileName) {
+this.file = new File(fileName);

Review comment:
   I added a normalization method which only allows letters, slashes and 
backslashes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #16108:
URL: https://github.com/apache/flink/pull/16108#issuecomment-856641884


   
   ## CI report:
   
   * a696994e8970d98e2f77aa7a4e1db93bb0ecbe53 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27718)
 
   * 5f040e7cb9eddcadd58fe0f37047c1fbd872512c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] paul8263 commented on a change in pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


paul8263 commented on a change in pull request #16108:
URL: https://github.com/apache/flink/pull/16108#discussion_r764519129



##
File path: flink-core/src/main/java/org/apache/flink/util/FileLock.java
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.Paths;
+
+/** A file lock used for avoiding race condition among multiple unit test 
processes */
+public class FileLock {
+private static final long TRY_LOCK_INTERVAL = 50;
+private final File file;
+private FileOutputStream outputStream;
+private java.nio.channels.FileLock lock;
+
+public FileLock(String fileName) {
+this.file = new File(fileName);
+}
+
+private void init() throws IOException {
+String filePath =
+Paths.get(System.getProperty("java.io.tmpdir"), 
file.getName()).toString();
+File lockFile = new File(filePath);
+if (!lockFile.exists()) {
+lockFile.createNewFile();
+}
+
+outputStream = new FileOutputStream(lockFile);
+}
+
+public void lock() throws IOException {
+if (outputStream == null) {
+init();
+}
+try {
+lock = outputStream.getChannel().lock();
+} catch (OverlappingFileLockException e) {
+try {
+Thread.sleep(TRY_LOCK_INTERVAL);
+} catch (InterruptedException ignored) {
+}
+lock();

Review comment:
   Yes, it should be wrapped in a while clause instead of a recursive 
invocation. I will remove the lock method because we don't need it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] paul8263 commented on a change in pull request #16108: [FLINK-22821][connector-kafka] FlinkKafkaProducerMigrationTest fails …

2021-12-07 Thread GitBox


paul8263 commented on a change in pull request #16108:
URL: https://github.com/apache/flink/pull/16108#discussion_r764518525



##
File path: flink-core/src/main/java/org/apache/flink/util/FileLock.java
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.Paths;
+
+/** A file lock used for avoiding race condition among multiple unit test 
processes */
+public class FileLock {

Review comment:
   I will add it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lincoln-lil commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not t

2021-12-07 Thread GitBox


lincoln-lil commented on a change in pull request #17939:
URL: https://github.com/apache/flink/pull/17939#discussion_r764518374



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##
@@ -232,26 +246,30 @@ private int deriveSinkParallelism(
  * messages.
  */
 private Transformation applyKeyBy(
-ChangelogMode changelogMode,
+TableConfig config,
 Transformation inputTransform,
 int[] primaryKeys,
 int sinkParallelism,
-boolean upsertMaterialize) {
-final int inputParallelism = inputTransform.getParallelism();
-if ((inputParallelism == sinkParallelism || 
changelogMode.containsOnly(RowKind.INSERT))
-&& !upsertMaterialize) {
-return inputTransform;
+int inputParallelism,
+boolean inputInsertOnly,
+boolean needMaterialize) {
+boolean sameParallelism = sinkParallelism == inputParallelism;
+final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk =
+
config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK);
+boolean sinkKeyBy = false;
+switch (sinkShuffleByPk) {
+case NONE:
+break;
+case AUTO:
+sinkKeyBy = inputInsertOnly && !sameParallelism;
+break;
+case FORCE:
+// single parallelism has no problem
+sinkKeyBy = !(sinkParallelism == 1 && inputParallelism == 1);
+break;
 }
-if (primaryKeys.length == 0) {
-throw new TableException(
-String.format(
-"The sink for table '%s' has a configured 
parallelism of %s, while the input parallelism is %s. "
-+ "Since the configured parallelism is 
different from the input's parallelism and "
-+ "the changelog mode is not insert-only, 
a primary key is required but could not "
-+ "be found.",
-
tableSinkSpec.getObjectIdentifier().asSummaryString(),
-sinkParallelism,
-inputParallelism));
+if (!sinkKeyBy && !needMaterialize) {
+return inputTransform;
 }
 
 final RowDataKeySelector selector =

Review comment:
   Good catch!  Should not add a empty key selector here (cause all the 
data go into one parallel task)  though the result is correct. I'll add a case 
for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17988: [FLINK-25010][Connectors/Hive] Speed up hive's createMRSplits by multi thread

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17988:
URL: https://github.com/apache/flink/pull/17988#issuecomment-984363654


   
   ## CI report:
   
   * 28b8d68b9819361880a3893eb021688617a2e5fb Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27401)
 
   * 92f5f7c95542c9a08863324d5c885106e554a6c2 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27768)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17988: [FLINK-25010][Connectors/Hive] Speed up hive's createMRSplits by multi thread

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17988:
URL: https://github.com/apache/flink/pull/17988#issuecomment-984363654


   
   ## CI report:
   
   * 28b8d68b9819361880a3893eb021688617a2e5fb Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27401)
 
   * 92f5f7c95542c9a08863324d5c885106e554a6c2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17988: [FLINK-25010][Connectors/Hive] Speed up hive's createMRSplits by multi thread

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17988:
URL: https://github.com/apache/flink/pull/17988#issuecomment-984363654


   
   ## CI report:
   
   * 28b8d68b9819361880a3893eb021688617a2e5fb Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27401)
 
   * 8b88d70ff7a9f271d2de0dcaef43b49b310193f2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-24730) Support prompt style in SqlCli

2021-12-07 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li closed FLINK-24730.

Resolution: Fixed

> Support prompt style in SqlCli
> --
>
> Key: FLINK-24730
> URL: https://issues.apache.org/jira/browse/FLINK-24730
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.15.0
>Reporter: Yubin Li
>Priority: Major
>
> prompt is a very popular function in cli of traditional database like mysql, 
> help users to notice  session context like current db/catalog/time, avoid 
> calling wrong commands in an unexpected session.
>  
> ||arg||comment||
> |\c|current catalog|
> |\d|current db|
> |\t|current time|
>  for example,
> FLINK SQL> set sql-client.prompt.style=\c~\d~\t;
> catalog1~db1~2021-11-12 16:00:00> show current catalog;



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [flink] zhuzhurk commented on a change in pull request #17952: [FLINK-25011][runtime] Introduce vertex parallelism decider and the default implementation.

2021-12-07 Thread GitBox


zhuzhurk commented on a change in pull request #17952:
URL: https://github.com/apache/flink/pull/17952#discussion_r764514899



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismDecider.java
##
@@ -29,17 +29,8 @@
 /**
  * Computing the parallelism.
  *
- * @param consumedResults The size of consumed blocking results.
+ * @param consumedResults The result info of consumed blocking results.

Review comment:
   result info -> information?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17462: [FLINK-23170] Write metadata after materialization

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17462:
URL: https://github.com/apache/flink/pull/17462#issuecomment-942024789


   
   ## CI report:
   
   * 459916eaca61c9210c403ee4f4125cfbf3590fa2 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27685)
 
   * ddc8a47426d41b78f448556c5b9a8bbf3ad8c19b Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27767)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #17462: [FLINK-23170] Write metadata after materialization

2021-12-07 Thread GitBox


flinkbot edited a comment on pull request #17462:
URL: https://github.com/apache/flink/pull/17462#issuecomment-942024789


   
   ## CI report:
   
   * 459916eaca61c9210c403ee4f4125cfbf3590fa2 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27685)
 
   * ddc8a47426d41b78f448556c5b9a8bbf3ad8c19b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #17462: [FLINK-23170] Write metadata after materialization

2021-12-07 Thread GitBox


curcur commented on pull request #17462:
URL: https://github.com/apache/flink/pull/17462#issuecomment-988451245


   squash commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not th

2021-12-07 Thread GitBox


JingsongLi commented on a change in pull request #17939:
URL: https://github.com/apache/flink/pull/17939#discussion_r764512993



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##
@@ -232,26 +246,30 @@ private int deriveSinkParallelism(
  * messages.
  */
 private Transformation applyKeyBy(
-ChangelogMode changelogMode,
+TableConfig config,
 Transformation inputTransform,
 int[] primaryKeys,
 int sinkParallelism,
-boolean upsertMaterialize) {
-final int inputParallelism = inputTransform.getParallelism();
-if ((inputParallelism == sinkParallelism || 
changelogMode.containsOnly(RowKind.INSERT))
-&& !upsertMaterialize) {
-return inputTransform;
+int inputParallelism,
+boolean inputInsertOnly,
+boolean needMaterialize) {
+boolean sameParallelism = sinkParallelism == inputParallelism;
+final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk =
+
config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK);
+boolean sinkKeyBy = false;
+switch (sinkShuffleByPk) {
+case NONE:
+break;
+case AUTO:
+sinkKeyBy = inputInsertOnly && !sameParallelism;
+break;
+case FORCE:
+// single parallelism has no problem
+sinkKeyBy = !(sinkParallelism == 1 && inputParallelism == 1);
+break;
 }
-if (primaryKeys.length == 0) {
-throw new TableException(
-String.format(
-"The sink for table '%s' has a configured 
parallelism of %s, while the input parallelism is %s. "
-+ "Since the configured parallelism is 
different from the input's parallelism and "
-+ "the changelog mode is not insert-only, 
a primary key is required but could not "
-+ "be found.",
-
tableSinkSpec.getObjectIdentifier().asSummaryString(),
-sinkParallelism,
-inputParallelism));
+if (!sinkKeyBy && !needMaterialize) {
+return inputTransform;
 }
 
 final RowDataKeySelector selector =

Review comment:
   Do code go here when table don't have pk? This will introduce a bug?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not th

2021-12-07 Thread GitBox


JingsongLi commented on a change in pull request #17939:
URL: https://github.com/apache/flink/pull/17939#discussion_r764511680



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##
@@ -140,6 +140,25 @@
 + "or force 
materialization(FORCE).")
 .build());
 
+@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+public static final ConfigOption 
TABLE_EXEC_SINK_SHUFFLE_BY_PK =
+key("table.exec.sink.pk-shuffle")

Review comment:
   `table.exec.sink.key-by-mode`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not th

2021-12-07 Thread GitBox


JingsongLi commented on a change in pull request #17939:
URL: https://github.com/apache/flink/pull/17939#discussion_r764511517



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##
@@ -232,26 +246,30 @@ private int deriveSinkParallelism(
  * messages.
  */
 private Transformation applyKeyBy(
-ChangelogMode changelogMode,
+TableConfig config,
 Transformation inputTransform,
 int[] primaryKeys,
 int sinkParallelism,
-boolean upsertMaterialize) {
-final int inputParallelism = inputTransform.getParallelism();
-if ((inputParallelism == sinkParallelism || 
changelogMode.containsOnly(RowKind.INSERT))
-&& !upsertMaterialize) {
-return inputTransform;
+int inputParallelism,
+boolean inputInsertOnly,
+boolean needMaterialize) {
+boolean sameParallelism = sinkParallelism == inputParallelism;
+final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk =
+
config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK);
+boolean sinkKeyBy = false;
+switch (sinkShuffleByPk) {
+case NONE:
+break;
+case AUTO:
+sinkKeyBy = inputInsertOnly && !sameParallelism;
+break;
+case FORCE:
+// single parallelism has no problem
+sinkKeyBy = !(sinkParallelism == 1 && inputParallelism == 1);

Review comment:
   `sinkParallelism != 1 || inputParallelism != 1`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not th

2021-12-07 Thread GitBox


JingsongLi commented on a change in pull request #17939:
URL: https://github.com/apache/flink/pull/17939#discussion_r764511367



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##
@@ -232,26 +246,30 @@ private int deriveSinkParallelism(
  * messages.
  */
 private Transformation applyKeyBy(
-ChangelogMode changelogMode,
+TableConfig config,
 Transformation inputTransform,
 int[] primaryKeys,
 int sinkParallelism,
-boolean upsertMaterialize) {
-final int inputParallelism = inputTransform.getParallelism();
-if ((inputParallelism == sinkParallelism || 
changelogMode.containsOnly(RowKind.INSERT))
-&& !upsertMaterialize) {
-return inputTransform;
+int inputParallelism,
+boolean inputInsertOnly,
+boolean needMaterialize) {
+boolean sameParallelism = sinkParallelism == inputParallelism;
+final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk =
+
config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK);
+boolean sinkKeyBy = false;
+switch (sinkShuffleByPk) {
+case NONE:
+break;
+case AUTO:
+sinkKeyBy = inputInsertOnly && !sameParallelism;
+break;
+case FORCE:

Review comment:
   ignore me. Already is my expectation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #17939: [FLINK-20370][table] part2: introduce 'table.exec.sink.pk-shuffle' option to auto keyby on sink's pk if parallelism are not th

2021-12-07 Thread GitBox


JingsongLi commented on a change in pull request #17939:
URL: https://github.com/apache/flink/pull/17939#discussion_r764511041



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##
@@ -232,26 +246,30 @@ private int deriveSinkParallelism(
  * messages.
  */
 private Transformation applyKeyBy(
-ChangelogMode changelogMode,
+TableConfig config,
 Transformation inputTransform,
 int[] primaryKeys,
 int sinkParallelism,
-boolean upsertMaterialize) {
-final int inputParallelism = inputTransform.getParallelism();
-if ((inputParallelism == sinkParallelism || 
changelogMode.containsOnly(RowKind.INSERT))
-&& !upsertMaterialize) {
-return inputTransform;
+int inputParallelism,
+boolean inputInsertOnly,
+boolean needMaterialize) {
+boolean sameParallelism = sinkParallelism == inputParallelism;
+final ExecutionConfigOptions.SinkShuffleByPk sinkShuffleByPk =
+
config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_SHUFFLE_BY_PK);
+boolean sinkKeyBy = false;
+switch (sinkShuffleByPk) {
+case NONE:
+break;
+case AUTO:
+sinkKeyBy = inputInsertOnly && !sameParallelism;

Review comment:
   `!sameParallelism` => `sinkParallelism != inputParallelism`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   6   7   8   >