[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink
JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink URL: https://github.com/apache/flink/pull/8435#discussion_r291888066 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala ## @@ -55,4 +57,8 @@ class GenericRelDataType( override def hashCode(): Int = { genericType.hashCode() } + + override def generateTypeString(sb: lang.StringBuilder, withDetail: Boolean): Unit = { Review comment: Now we delete `ArrayRelDataType` and use `ArraySqlType` and etc... NOTE: `equals` of `ArraySqlType` is: ``` @Override public boolean equals(Object obj) { if (obj instanceof RelDataTypeImpl) { final RelDataTypeImpl that = (RelDataTypeImpl) obj; return this.digest.equals(that.digest); } return false; } ``` So we must need override `generateTypeString` all calcite extended type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect
docete commented on a change in pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect URL: https://github.com/apache/flink/pull/8626#discussion_r291887668 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.validate; + +import org.apache.calcite.sql.validate.SqlConformance; + +/** Sql conformance used for flink to set specific sql dialect parser. **/ +public enum FlinkSqlConformance implements SqlConformance { + /** Calcite's default SQL behavior. */ + DEFAULT, + + /** Conformance value that instructs Calcite to use SQL semantics +* consistent with the Apache HIVE, but ignoring its more +* inconvenient or controversial dicta. */ Review comment: "dicta" typo of "dialect" ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #8673: blink first commit
KurtYoung commented on issue #8673: blink first commit URL: https://github.com/apache/flink/pull/8673#issuecomment-500300991 @waywtd Please don't open this pull request This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8673: blink first commit
flinkbot commented on issue #8673: blink first commit URL: https://github.com/apache/flink/pull/8673#issuecomment-500300770 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] waywtd commented on issue #8673: blink first commit
waywtd commented on issue #8673: blink first commit URL: https://github.com/apache/flink/pull/8673#issuecomment-500300630 123 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8672: blink first commit
flinkbot commented on issue #8672: blink first commit URL: https://github.com/apache/flink/pull/8672#issuecomment-500300573 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] waywtd opened a new pull request #8673: blink first commit
waywtd opened a new pull request #8673: blink first commit URL: https://github.com/apache/flink/pull/8673 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung closed pull request #8672: blink first commit
KurtYoung closed pull request #8672: blink first commit URL: https://github.com/apache/flink/pull/8672 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12541) Add deploy a Python Flink job and session cluster on Kubernetes support.
[ https://issues.apache.org/jira/browse/FLINK-12541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859718#comment-16859718 ] Dian Fu commented on FLINK-12541: - [~sunjincheng121] Thanks a lot for the suggestions. I have create a ticket FLINK-12788 for the part2 changes. > Add deploy a Python Flink job and session cluster on Kubernetes support. > > > Key: FLINK-12541 > URL: https://issues.apache.org/jira/browse/FLINK-12541 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Runtime / REST >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Add deploy a Python Flink job and session cluster on Kubernetes support. > We need to have the same deployment step as the Java job. Please see: > [https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12789) Fix java docs in UserDefinedAggregateFunction
Hequn Cheng created FLINK-12789: --- Summary: Fix java docs in UserDefinedAggregateFunction Key: FLINK-12789 URL: https://issues.apache.org/jira/browse/FLINK-12789 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng We use \{{UserDefinedAggregateFunction}} as the base class for \{{TableAggregateFunction}} and \{{AggregateFunction}}. However, the java docs in \{{UserDefinedAggregateFunction}} are only dedicated for \{{AggregateFunction}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] waywtd opened a new pull request #8672: blink first commit
waywtd opened a new pull request #8672: blink first commit URL: https://github.com/apache/flink/pull/8672 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangXingBo commented on a change in pull request #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs
HuangXingBo commented on a change in pull request #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs URL: https://github.com/apache/flink/pull/8671#discussion_r291886091 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java ## @@ -119,9 +119,9 @@ protected ProgramOptions(CommandLine line) throws CliArgsException { // PythonDriver args: pym ${py-module} pyfs ${py-files} [optional] ${other args}. // e.g. -pym AAA.fun -pyfs AAA.zip(CLI cmd) > pym AAA.fun -pyfs AAA.zip(PythonDriver args) String[] newArgs = new String[args.length + 4]; - newArgs[0] = PYMODULE_OPTION.getOpt(); + newArgs[0] = "-" + PYMODULE_OPTION.getOpt(); newArgs[1] = line.getOptionValue(PYMODULE_OPTION.getOpt()); - newArgs[2] = PYFILES_OPTION.getOpt(); + newArgs[2] = "-" + PYFILES_OPTION.getOpt(); Review comment: The transform note will be changed together or the note can be delete This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangXingBo commented on a change in pull request #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs
HuangXingBo commented on a change in pull request #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs URL: https://github.com/apache/flink/pull/8671#discussion_r291886132 ## File path: flink-python/src/main/java/org/apache/flink/python/client/PythonDriverOptionsParserFactory.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.python.client; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Parser factory which generates a {@link PythonDriverOptions} from a given + * list of command line arguments. + */ +final class PythonDriverOptionsParserFactory implements ParserResultFactory { + + private static final Option PY_OPTION = Option.builder("py") + .longOpt("python") + .required(false) + .hasArg(true) + .argName("entrypoint python file") + .desc("Python script with the program entry point. " + + "The dependent resources can be configured with the `--pyFiles` option.") + .build(); + + private static final Option PYMODULE_OPTION = Option.builder("pym") + .longOpt("pyModule") + .required(false) + .hasArg(true) + .argName("entrypoint module name") + .desc("Python module with the program entry point. " + + "This option must be used in conjunction with `--pyFiles`.") + .build(); + + private static final Option PYFILES_OPTION = Option.builder("pyfs") + .longOpt("pyFiles") + .required(false) + .hasArg(true) + .argName("entrypoint python file") + .desc("Attach custom python files for job. " + + "Comma can be used as the separator to specify multiple files. " + + "The standard python resource file suffixes such as .py/.egg/.zip are all supported." + + "(eg: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)") + .build(); + + @Override + public Options getOptions() { + final Options options = new Options(); + options.addOption(PY_OPTION); + options.addOption(PYMODULE_OPTION); + options.addOption(PYFILES_OPTION); + return options; + } + + @Override + public PythonDriverOptions createResult(@Nonnull CommandLine commandLine) throws FlinkParseException { + String entrypointModule = null; + final List pythonLibFiles = new ArrayList<>(); + + if (commandLine.hasOption(PY_OPTION.getOpt()) && commandLine.hasOption(PYMODULE_OPTION.getOpt())) { + throw new FlinkParseException("Cannot use options -py and -pym simultaneously."); Review comment: The simmilar logic has existed in ProgramOptions. Whether the similar code in ProgramOptions could be detelte. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangXingBo commented on a change in pull request #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs
HuangXingBo commented on a change in pull request #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs URL: https://github.com/apache/flink/pull/8671#discussion_r291886055 ## File path: flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java ## @@ -95,14 +95,14 @@ protected ProgramOptions(CommandLine line) throws CliArgsException { int argIndex; if (line.hasOption(PYFILES_OPTION.getOpt())) { newArgs = new String[args.length + 4]; - newArgs[2] = PYFILES_OPTION.getOpt(); + newArgs[2] = "-" + PYFILES_OPTION.getOpt(); Review comment: The transform note will be changed together or the note can be delete This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] docete commented on a change in pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect
docete commented on a change in pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect URL: https://github.com/apache/flink/pull/8626#discussion_r291885792 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java ## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dml; + +import org.apache.flink.sql.parser.SqlProperty; +import org.apache.flink.sql.parser.ddl.ExtendedSqlNode; +import org.apache.flink.sql.parser.error.SqlParseException; + +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.validate.SqlValidatorNamespace; +import org.apache.calcite.sql.validate.SqlValidatorScope; +import org.apache.calcite.sql.validate.SqlValidatorTable; +import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.calcite.util.Pair; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.calcite.util.Static.RESOURCE; + +/** An {@link SqlInsert} that have some extension functions like partition, overwrite. **/ Review comment: Seems we did not support INSERT OVERWRITE in the former Parser. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script
dianfu commented on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script URL: https://github.com/apache/flink/pull/8609#issuecomment-500295981 @tillrohrmann Thanks a lot for your review. Your suggestion makes much sense to me. I have created a dedicated JIRA [FLINK-12788](https://issues.apache.org/jira/browse/FLINK-12788) for this PR. Regarding to the changes to `StandaloneJobClusterEntrypoint`, agree that there should not be special logic for Python. I will revert that part of changes. @sunjincheng121 Thanks a lot for your review. I have created a separate JIRA [FLINK-12787](https://issues.apache.org/jira/browse/FLINK-12787) for the PythonDriver improving. Then we can focus this PR on the build script changes for Python jobs support. Will updated the PR later today. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12788) Add support to run a Python job-specific cluster on Kubernetes
Dian Fu created FLINK-12788: --- Summary: Add support to run a Python job-specific cluster on Kubernetes Key: FLINK-12788 URL: https://issues.apache.org/jira/browse/FLINK-12788 Project: Flink Issue Type: Sub-task Components: API / Python, Deployment / Docker Reporter: Dian Fu Assignee: Dian Fu As discussed in FLINK-12541, we need to support to run a Python job-specific cluster on Kubernetes. To support this, we need to improve the job specific docker image build scripts to support Python Table API jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs
flinkbot commented on issue #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs URL: https://github.com/apache/flink/pull/8671#issuecomment-500293007 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu opened a new pull request #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs
dianfu opened a new pull request #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs URL: https://github.com/apache/flink/pull/8671 ## What is the purpose of the change *This pull request add support to allow to specify directory in option `-pyfs`* ## Brief change log - *Add support to allow to specify directory in option `-pyfs`* - *Improve PythonDriver to use CommandLineParser for the arguments parsing* ## Verifying this change This change added tests and can be verified as follows: - *Existing tests in PythonDriverTest* - *Added tests in PythonDriverOptionsParserFactoryTest and PythonEnvUtilsTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12787) Allow to specify directory in option -pyfs
[ https://issues.apache.org/jira/browse/FLINK-12787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12787: --- Labels: pull-request-available (was: ) > Allow to specify directory in option -pyfs > -- > > Key: FLINK-12787 > URL: https://issues.apache.org/jira/browse/FLINK-12787 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > Current only files can be specified in option `-pyfs`, we want to improve it > allow also specify directories in option `-pyfs`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12787) Allow to specify directory in option -pyfs
Dian Fu created FLINK-12787: --- Summary: Allow to specify directory in option -pyfs Key: FLINK-12787 URL: https://issues.apache.org/jira/browse/FLINK-12787 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Current only files can be specified in option `-pyfs`, we want to improve it allow also specify directories in option `-pyfs`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 removed a comment on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script
sunjincheng121 removed a comment on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script URL: https://github.com/apache/flink/pull/8609#issuecomment-500290745 Thanks for the PR @dianfu I think the suggestion from @tillrohrmann makes more sense to me. we should let the PR have its own JIRA. furthermore, the improvement of `pyfs` also can in a new PR (with a new JIRA). BTW: please rebase the code, and will have another review :) What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script
sunjincheng121 commented on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script URL: https://github.com/apache/flink/pull/8609#issuecomment-500290745 Thanks for the PR @dianfu I think the suggestion from @tillrohrmann makes more sense to me. we should let the PR have its own JIRA. furthermore, the improvement of `pyfs` also can in a new PR (with a new JIRA). What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 edited a comment on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script
sunjincheng121 edited a comment on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script URL: https://github.com/apache/flink/pull/8609#issuecomment-500290745 Thanks for the PR @dianfu I think the suggestion from @tillrohrmann makes more sense to me. we should let the PR have its own JIRA. furthermore, the improvement of `pyfs` also can in a new PR (with a new JIRA). BTW: please rebase the code, and will have another review :) What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on issue #8641: [FLINK-12757][python] Improves the word_count example to use the descriptor API
dianfu commented on issue #8641: [FLINK-12757][python] Improves the word_count example to use the descriptor API URL: https://github.com/apache/flink/pull/8641#issuecomment-500289996 @sunjincheng121 Thanks a lot for the review. Updated accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8670: [hotfix][utils] Replace implemented FutureUtils#toJava with lib method
flinkbot commented on issue #8670: [hotfix][utils] Replace implemented FutureUtils#toJava with lib method URL: https://github.com/apache/flink/pull/8670#issuecomment-500289795 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun opened a new pull request #8670: [hotfix][utils] Replace implemented FutureUtils#toJava with lib method
TisonKun opened a new pull request #8670: [hotfix][utils] Replace implemented FutureUtils#toJava with lib method URL: https://github.com/apache/flink/pull/8670 ## What is the purpose of the change Replace implemented `FutureUtils#toJava` with scala's lib method. ## Brief change log Replace `FutureUtils#toJava` with `scala.compat.java8.FutureConverters#toJava`. For the unchecked upcast, ref https://github.com/scala/scala-java8-compat/pull/152 but it should be ok in this case. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) cc @zentol @GJL This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8641: [FLINK-12757][python] Improves the word_count example to use the descriptor API
sunjincheng121 commented on a change in pull request #8641: [FLINK-12757][python] Improves the word_count example to use the descriptor API URL: https://github.com/apache/flink/pull/8641#discussion_r291877199 ## File path: flink-python/pyflink/table/examples/batch/word_count.py ## @@ -39,41 +34,40 @@ def word_count(): "License you may not use this file except in compliance " \ "with the License" -with open(source_path, 'w') as f: -for word in content.split(" "): -f.write(",".join([word, "1"])) -f.write("\n") -f.flush() -f.close() - t_config = TableConfig.Builder().as_batch_execution().build() t_env = TableEnvironment.create(t_config) -field_names = ["word", "cout"] -field_types = [DataTypes.STRING(), DataTypes.BIGINT()] - -# register Orders table in table environment -t_env.register_table_source( -"Word", -CsvTableSource(source_path, field_names, field_types)) - # register Results table in table environment tmp_dir = tempfile.gettempdir() -tmp_csv = tmp_dir + '/streaming2.csv' -if os.path.isfile(tmp_csv): -os.remove(tmp_csv) +result_dir = tmp_dir + '/result' +if os.path.exists(result_dir): +try: +shutil.rmtree(result_dir) Review comment: we should check the file by follows code: ``` if os.path.isfile(result_dir): os.remove(result_dir) else: shutil.rmtree(result_dir) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#discussion_r291876341 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -47,14 +70,33 @@ * @throws RuntimeException if the Params doesn't contains the specific parameter, while the * param is not optional but has no default value in the {@code info} */ - @SuppressWarnings("unchecked") public V get(ParamInfo info) { - V value = (V) paramMap.getOrDefault(info.getName(), info.getDefaultValue()); - if (value == null && !info.isOptional() && !info.hasDefaultValue()) { - throw new RuntimeException(info.getName() + - " not exist which is not optional and don't have a default value"); + Stream stream = getParamNameAndAlias(info) + .filter(this.params::containsKey) + .map(x -> this.params.get(x)) + .map(x -> valueFromJson(x, info.getValueClass())) + .limit(1); Review comment: If not use stream object, we need more codes. For the value of parameter may be null, and the findFirst method is not friendly for the null, thus we chose the limit(.) method. When the user sets a Null as the Value of Param, using the findFirst method in the Stream class will throw NullPointerException. Looking deep into the code, findFirst returns an instance of Optional, but Optional can only use non-null value as a constructor argument. For example, the following code will throw NullPointerException. Stream.of(null, "2").findFirst(); This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#discussion_r291874460 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -93,59 +126,105 @@ * @param the type of the specific parameter */ public void remove(ParamInfo info) { - paramMap.remove(info.getName()); + params.remove(info.getName()); + for (String a : info.getAlias()) { + params.remove(a); + } } - /** -* Creates and returns a deep clone of this Params. -* -* @return a deep clone of this Params -*/ - public Params clone() { - Params newParams = new Params(); - newParams.paramMap.putAll(this.paramMap); - return newParams; + public boolean contains(ParamInfo paramInfo) { + return params.containsKey(paramInfo.getName()) || + Arrays.stream(paramInfo.getAlias()).anyMatch(params::containsKey); } /** -* Returns a json containing all parameters in this Params. The json should be human-readable if -* possible. +* Creates and returns a deep clone of this Params. * * @return a json containing all parameters in this Params */ public String toJson() { - ObjectMapper mapper = new ObjectMapper(); - Map stringMap = new HashMap<>(); try { - for (Map.Entry e : paramMap.entrySet()) { - stringMap.put(e.getKey(), mapper.writeValueAsString(e.getValue())); - } - return mapper.writeValueAsString(stringMap); + return mapper.writeValueAsString(params); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize params to json", e); } } /** * Restores the parameters from the given json. The parameters should be exactly the same with -* the one who was serialized to the input json after the restoration. The class mapping of the -* parameters in the json is required because it is hard to directly restore a param of a user -* defined type. Params will be treated as String if it doesn't exist in the {@code classMap}. +* the one who was serialized to the input json after the restoration. * -* @param json the json String to restore from -* @param classMap the classes of the parameters contained in the json +* @param json the json String to restore from */ @SuppressWarnings("unchecked") - public void loadJson(String json, Map> classMap) { + public void loadJson(String json) { ObjectMapper mapper = new ObjectMapper(); + Map params; try { - Map m = mapper.readValue(json, Map.class); - for (Map.Entry e : m.entrySet()) { - Class valueClass = classMap.getOrDefault(e.getKey(), String.class); - paramMap.put(e.getKey(), mapper.readValue(e.getValue(), valueClass)); + params = mapper.readValue(json, Map.class); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize json:" + json, e); + } + this.params.clear(); + this.params.putAll(params); + } + + public static Params fromJson(String json) { + Params params = new Params(); + params.loadJson(json); + return params; + } + + public Params merge(Params otherParams) { + if (otherParams != null) { + this.params.putAll(otherParams.params); + } + return this; + } + + @Override + public Params clone() { + Params newParams = new Params(); + newParams.params.putAll(this.params); + return newParams; + } + + private void assertMapperInited() { + if (mapper == null) { + mapper = new ObjectMapper(); + } + } + + private String valueToJson(Object value) { + assertMapperInited(); + try { + if (value == null) { + return null; + } + return mapper.writeValueAsString(value); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize to json:" + value, e); +
[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#discussion_r291874378 ## File path: flink-ml-parent/flink-ml/src/main/java/org/apache/flink/ml/params/shared/colname/HasCategoricalColNames.java ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.params.shared.colname; + +import org.apache.flink.ml.api.misc.param.ParamInfo; +import org.apache.flink.ml.api.misc.param.ParamInfoFactory; +import org.apache.flink.ml.params.BaseWithParam; + +/** + * Names of the categorical columns used for training in the input table. + */ +public interface HasCategoricalColNames extends BaseWithParam { + + ParamInfo CATEGORICAL_COL_NAMES = ParamInfoFactory + .createParamInfo("categoricalColNames", String[].class) + .setDescription("Names of the categorical columns used for training in the input table") + .setHasDefaultValue(new String[] {}) Review comment: I removed this default value, it is not necessary and maybe confused for user. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#discussion_r291873818 ## File path: flink-ml-parent/flink-ml/src/main/java/org/apache/flink/ml/params/shared/HasVectorSizeDv100.java ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.ml.params.shared; + +import org.apache.flink.ml.api.misc.param.ParamInfo; +import org.apache.flink.ml.api.misc.param.ParamInfoFactory; +import org.apache.flink.ml.params.BaseWithParam; + +/** + * Vector size of embedding. + */ +public interface HasVectorSizeDv100 extends BaseWithParam { + /** +* @cn embedding的向量长度 Review comment: Thanks, removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#discussion_r291873656 ## File path: flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java ## @@ -47,14 +70,33 @@ * @throws RuntimeException if the Params doesn't contains the specific parameter, while the * param is not optional but has no default value in the {@code info} */ - @SuppressWarnings("unchecked") public V get(ParamInfo info) { - V value = (V) paramMap.getOrDefault(info.getName(), info.getDefaultValue()); - if (value == null && !info.isOptional() && !info.hasDefaultValue()) { - throw new RuntimeException(info.getName() + - " not exist which is not optional and don't have a default value"); + Stream stream = getParamNameAndAlias(info) Review comment: I changed the name to "paramValue". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuyang1706 commented on issue #8632: [FLINK-12744][ml] add shared params in ml package
xuyang1706 commented on issue #8632: [FLINK-12744][ml] add shared params in ml package URL: https://github.com/apache/flink/pull/8632#issuecomment-500284364 > Hi, @xuyang1706 thanks for your work! > I left a few comments to your changes, could you look into when you will have a chance. > Thanks! Hi @ex00, sorry for the late reply, I just came back from my vacation. Thanks for your helpful comments! I have updated the code and reply them inlines. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog
lirui-apache commented on a change in pull request #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog URL: https://github.com/apache/flink/pull/8616#discussion_r291872326 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -109,27 +114,51 @@ private HiveMetastoreClientWrapper client; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); + public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveSiteFilePath) { + this(catalogName, + defaultDatabase == null ? DEFAULT_DB : defaultDatabase, + getHiveConf(hiveSiteFilePath)); } - public HiveCatalog(String catalogName, HiveConf hiveConf) { - this(catalogName, DEFAULT_DB, hiveConf); + public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable URL hiveSiteUrl) { + this(catalogName, + defaultDatabase == null ? DEFAULT_DB : defaultDatabase, + getHiveConf(hiveSiteUrl)); } - public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { - super(catalogName, defaultDatabase); - this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); + public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable HiveConf hiveConf) { + super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase); + + this.hiveConf = hiveConf == null ? getHiveConf("") : hiveConf; LOG.info("Created HiveCatalog '{}'", catalogName); } - private static HiveConf getHiveConf(String hiveMetastoreURI) { - checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + private static HiveConf getHiveConf(String filePath) { + + URL url = null; + + if (!StringUtils.isNullOrWhitespaceOnly(filePath)) { + try { + url = new File(filePath).toURI().toURL(); + + LOG.info("Successfully loaded '{}'", filePath); + + } catch (MalformedURLException e) { + throw new CatalogException( + String.format("Failed to get hive-site.xml from the given path '%s'", filePath), e); + } + } + + return getHiveConf(url); + } + + private static HiveConf getHiveConf(URL hiveSiteUrl) { + LOG.info("Setting hive-site location as {}", hiveSiteUrl); + + HiveConf.setHiveSiteLocation(hiveSiteUrl); Review comment: Wondering will this incur any concurrency issue, e.g. when user access tables from different Hive catalogs? I think at least some constructors of HiveConf is no longer safe to use after this change, like `HiveConf()` and `HiveConf(Class cls)` which automatically load the specified hive-site.xml. I think it's better not to rely on static fields of HiveConf. Instead, we can load the hive-site (perhaps `Properties::loadFromXML`?) ourselves and `HiveConf::hiveSiteURL` should always be set to null. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12765) Bookkeeping of available resources of allocated slots in SlotPool.
[ https://issues.apache.org/jira/browse/FLINK-12765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tony Xintong Song updated FLINK-12765: -- Summary: Bookkeeping of available resources of allocated slots in SlotPool. (was: JobMaster calculates resource needs and requests slot for the entire slot sharing group.) > Bookkeeping of available resources of allocated slots in SlotPool. > -- > > Key: FLINK-12765 > URL: https://issues.apache.org/jira/browse/FLINK-12765 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.8.0, 1.9.0 >Reporter: Tony Xintong Song >Assignee: Yun Gao >Priority: Major > Fix For: 1.9.0 > > > In this version, a task will always requests slot with its own resource need. > If the resource need is less than the default slot resource, it will always > be allocated to a default sized slot. > > The extra resources in the slot leaves chances for other tasks within the > same slot sharing group to fit in. To take these chance, SlotPool will > maintain available resources of each allocated slot. Available resource of an > allocated slot should always be the total resource of the slot minus > resources of tasks already assigned onto the slot. In this way, the SlotPool > would be able to determine whether another task can fit into the slot. If a > task cannot fit into the slot, for slot sharing group the SlotPool should > request another slot from the ResourceManager, and for colocation group it > should fail the job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12744) ML common parameters
[ https://issues.apache.org/jira/browse/FLINK-12744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xu Yang updated FLINK-12744: Description: We defined some common-used parameters for machine-learning algorithms. - *add ML common parameters* - *this is sub pr of #8586* - *change behavior when use default constructor of param factory* - *add shared params in ml package* - *add flink-ml module* was:We defined some common-used parameters for machine-learning algorithms. > ML common parameters > > > Key: FLINK-12744 > URL: https://issues.apache.org/jira/browse/FLINK-12744 > Project: Flink > Issue Type: Sub-task >Reporter: Xu Yang >Assignee: Xu Yang >Priority: Major > > We defined some common-used parameters for machine-learning algorithms. > > - *add ML common parameters* > - *this is sub pr of #8586* > - *change behavior when use default constructor of param factory* > - *add shared params in ml package* > - *add flink-ml module* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12744) ML common parameters
[ https://issues.apache.org/jira/browse/FLINK-12744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xu Yang updated FLINK-12744: Description: We defined some common-used parameters for machine-learning algorithms. - *add ML common parameters* - *this is sub pr of #8586* - *change behavior when use default constructor of param factory* - *add shared params in ml package* - *add flink-ml module* was: We defined some common-used parameters for machine-learning algorithms. - *add ML common parameters* - *this is sub pr of #8586* - *change behavior when use default constructor of param factory* - *add shared params in ml package* - *add flink-ml module* > ML common parameters > > > Key: FLINK-12744 > URL: https://issues.apache.org/jira/browse/FLINK-12744 > Project: Flink > Issue Type: Sub-task >Reporter: Xu Yang >Assignee: Xu Yang >Priority: Major > > We defined some common-used parameters for machine-learning algorithms. > - *add ML common parameters* > - *this is sub pr of #8586* > - *change behavior when use default constructor of param factory* > - *add shared params in ml package* > - *add flink-ml module* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12744) ML common parameters
[ https://issues.apache.org/jira/browse/FLINK-12744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xu Yang updated FLINK-12744: Description: We defined some common-used parameters for machine-learning algorithms. - *add ML common parameters* - *change behavior when use default constructor of param factory* - *add shared params in ml package* - *add flink-ml module* was: We defined some common-used parameters for machine-learning algorithms. - *add ML common parameters* - *this is sub pr of #8586* - *change behavior when use default constructor of param factory* - *add shared params in ml package* - *add flink-ml module* > ML common parameters > > > Key: FLINK-12744 > URL: https://issues.apache.org/jira/browse/FLINK-12744 > Project: Flink > Issue Type: Sub-task >Reporter: Xu Yang >Assignee: Xu Yang >Priority: Major > > We defined some common-used parameters for machine-learning algorithms. > - *add ML common parameters* > - *change behavior when use default constructor of param factory* > - *add shared params in ml package* > - *add flink-ml module* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on issue #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function
sunjincheng121 commented on issue #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function URL: https://github.com/apache/flink/pull/8669#issuecomment-500279807 @flinkbot approve description This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator
JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator URL: https://github.com/apache/flink/pull/8629#discussion_r291869732 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingJoinOperator.java ## @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join.stream; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.dataformat.GenericRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataformat.util.BaseRowUtil; +import org.apache.flink.table.generated.GeneratedJoinCondition; +import org.apache.flink.table.generated.JoinCondition; +import org.apache.flink.table.runtime.join.NullAwareJoinHelper; +import org.apache.flink.table.runtime.join.stream.state.JoinInputSideSpec; +import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateView; +import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateViews; +import org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateView; +import org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateViews; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.util.IterableIterator; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Streaming unbounded Join operator which support INNER/LEFT/RIGHT/FULL JOIN. + */ +public class StreamingJoinOperator extends AbstractStreamOperator + implements TwoInputStreamOperator { + + private static final long serialVersionUID = -376944622236540545L; + + private final BaseRowTypeInfo leftType; + private final BaseRowTypeInfo rightType; + private final GeneratedJoinCondition generatedJoinCondition; + + private final JoinInputSideSpec leftInputSideSpec; + private final JoinInputSideSpec rightInputSideSpec; + + // whether left side is outer side, e.g. left is outer but right is not when LEFT OUTER JOIN + private final boolean leftIsOuter; + // whether right side is outer side, e.g. right is outer but left is not when RIGHT OUTER JOIN + private final boolean rightIsOuter; + + /** +* Should filter null keys. +*/ + private final int[] nullFilterKeys; + + /** +* No keys need to filter null. +*/ + private final boolean nullSafe; + + /** +* Filter null to all keys. +*/ + private final boolean filterAllNulls; + + private final long minRetentionTime; + private final boolean stateCleaningEnabled; + + private transient JoinCondition joinCondition; + private transient TimestampedCollector collector; + + private transient JoinedRow outRow; + private transient BaseRow leftNullRow; + private transient BaseRow rightNullRow; + + // left join state + private transient JoinRecordStateView leftRecordStateView; + // right join state + private transient JoinRecordStateView rightRecordStateView; + + public StreamingJoinOperator( + BaseRowTypeInfo leftType, + BaseRowTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + JoinInputSideSpec leftInputSideSpec, + JoinInputSideSpec rightInputSideSpec, + boolean leftIsOuter, + boolean rightIsOuter, + boolean[] filterNullKeys, + long minRetentionTime) { + this.leftType = leftType; + this.rightType
[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator
JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator URL: https://github.com/apache/flink/pull/8629#discussion_r291867027 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala ## @@ -104,6 +110,14 @@ class StreamExecJoin( new StreamExecJoin(cluster, traitSet, left, right, conditionExpr, joinType) } + + override def explainTerms(pw: RelWriter): RelWriter = { +super + .explainTerms(pw) + .item("leftInputSpec", analyzeJoinInput(left)) Review comment: Does `JoinInputSideSpec` useful to `explainTerms`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator
JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator URL: https://github.com/apache/flink/pull/8629#discussion_r291867330 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala ## @@ -123,7 +137,109 @@ class StreamExecJoin( override protected def translateToPlanInternal( tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = { -throw new TableException("Implements this") + +val tableConfig = tableEnv.getConfig +val returnType = FlinkTypeFactory.toInternalRowType(getRowType).toTypeInfo + +val leftTransform = getInputNodes.get(0).translateToPlan(tableEnv) + .asInstanceOf[StreamTransformation[BaseRow]] +val rightTransform = getInputNodes.get(1).translateToPlan(tableEnv) + .asInstanceOf[StreamTransformation[BaseRow]] + +val leftType = leftTransform.getOutputType.asInstanceOf[BaseRowTypeInfo] +val rightType = rightTransform.getOutputType.asInstanceOf[BaseRowTypeInfo] + +val (leftJoinKey, rightJoinKey) = + JoinUtil.checkAndGetJoinKeys(keyPairs, getLeft, getRight, allowEmptyKey = true) + +val leftSelect = KeySelectorUtil.getBaseRowSelector(leftJoinKey, leftType) +val rightSelect = KeySelectorUtil.getBaseRowSelector(rightJoinKey, rightType) + +val leftInputSpec = analyzeJoinInput(left) +val rightInputSpec = analyzeJoinInput(right) + +val generatedCondition = JoinUtil.generateConditionFunction( + tableConfig, + cluster.getRexBuilder, + getJoinInfo, + TypeConverters.createInternalTypeFromTypeInfo(leftType), + TypeConverters.createInternalTypeFromTypeInfo(rightType)) + +if (joinType == JoinRelType.ANTI || joinType == JoinRelType.SEMI) { + throw new TableException("SEMI/ANTI Join is not supported yet.") +} + +val leftIsOuter = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL +val rightIsOuter = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL +val minRetentionTime = tableConfig.getMinIdleStateRetentionTime + +val operator = new StreamingJoinOperator( + leftType, + rightType, + generatedCondition, + leftInputSpec, + rightInputSpec, + leftIsOuter, + rightIsOuter, + filterNulls, + minRetentionTime) + +val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow]( + leftTransform, + rightTransform, + "Join", Review comment: Transformation name so simple? Should add some condition message like batch: ``` private def getOperatorName: String = if (getCondition != null) { val inFields = inputRowType.getFieldNames.toList s"SortMergeJoin(where: ${ getExpressionString(getCondition, inFields, None, ExpressionFormat.Infix)})" } else { "SortMergeJoin" } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator
JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator URL: https://github.com/apache/flink/pull/8629#discussion_r291866569 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala ## @@ -104,6 +110,14 @@ class StreamExecJoin( new StreamExecJoin(cluster, traitSet, left, right, conditionExpr, joinType) } + Review comment: remove empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator
JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator URL: https://github.com/apache/flink/pull/8629#discussion_r291867580 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala ## @@ -123,7 +137,109 @@ class StreamExecJoin( override protected def translateToPlanInternal( tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = { -throw new TableException("Implements this") + +val tableConfig = tableEnv.getConfig +val returnType = FlinkTypeFactory.toInternalRowType(getRowType).toTypeInfo + +val leftTransform = getInputNodes.get(0).translateToPlan(tableEnv) + .asInstanceOf[StreamTransformation[BaseRow]] +val rightTransform = getInputNodes.get(1).translateToPlan(tableEnv) + .asInstanceOf[StreamTransformation[BaseRow]] + +val leftType = leftTransform.getOutputType.asInstanceOf[BaseRowTypeInfo] +val rightType = rightTransform.getOutputType.asInstanceOf[BaseRowTypeInfo] + +val (leftJoinKey, rightJoinKey) = + JoinUtil.checkAndGetJoinKeys(keyPairs, getLeft, getRight, allowEmptyKey = true) + +val leftSelect = KeySelectorUtil.getBaseRowSelector(leftJoinKey, leftType) +val rightSelect = KeySelectorUtil.getBaseRowSelector(rightJoinKey, rightType) + +val leftInputSpec = analyzeJoinInput(left) +val rightInputSpec = analyzeJoinInput(right) + +val generatedCondition = JoinUtil.generateConditionFunction( + tableConfig, + cluster.getRexBuilder, + getJoinInfo, + TypeConverters.createInternalTypeFromTypeInfo(leftType), + TypeConverters.createInternalTypeFromTypeInfo(rightType)) + +if (joinType == JoinRelType.ANTI || joinType == JoinRelType.SEMI) { + throw new TableException("SEMI/ANTI Join is not supported yet.") +} + +val leftIsOuter = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL +val rightIsOuter = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL +val minRetentionTime = tableConfig.getMinIdleStateRetentionTime + +val operator = new StreamingJoinOperator( + leftType, + rightType, + generatedCondition, + leftInputSpec, + rightInputSpec, + leftIsOuter, + rightIsOuter, + filterNulls, + minRetentionTime) + +val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow]( + leftTransform, + rightTransform, + "Join", + operator, + returnType, + leftTransform.getParallelism) + +if (leftJoinKey.isEmpty) { + ret.setParallelism(1) + ret.setMaxParallelism(1) +} + +// set KeyType and Selector for state +ret.setStateKeySelectors(leftSelect, rightSelect) + ret.setStateKeyType(leftSelect.asInstanceOf[ResultTypeQueryable[_]].getProducedType) +ret } + private def analyzeJoinInput(input: RelNode): JoinInputSideSpec = { Review comment: Some similar to `needsUpdatesAsRetraction`, need to extract a method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator
JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator URL: https://github.com/apache/flink/pull/8629#discussion_r291867419 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala ## @@ -123,7 +137,109 @@ class StreamExecJoin( override protected def translateToPlanInternal( tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = { -throw new TableException("Implements this") + +val tableConfig = tableEnv.getConfig +val returnType = FlinkTypeFactory.toInternalRowType(getRowType).toTypeInfo + +val leftTransform = getInputNodes.get(0).translateToPlan(tableEnv) + .asInstanceOf[StreamTransformation[BaseRow]] +val rightTransform = getInputNodes.get(1).translateToPlan(tableEnv) + .asInstanceOf[StreamTransformation[BaseRow]] + +val leftType = leftTransform.getOutputType.asInstanceOf[BaseRowTypeInfo] +val rightType = rightTransform.getOutputType.asInstanceOf[BaseRowTypeInfo] + +val (leftJoinKey, rightJoinKey) = + JoinUtil.checkAndGetJoinKeys(keyPairs, getLeft, getRight, allowEmptyKey = true) + +val leftSelect = KeySelectorUtil.getBaseRowSelector(leftJoinKey, leftType) +val rightSelect = KeySelectorUtil.getBaseRowSelector(rightJoinKey, rightType) + +val leftInputSpec = analyzeJoinInput(left) +val rightInputSpec = analyzeJoinInput(right) + +val generatedCondition = JoinUtil.generateConditionFunction( + tableConfig, + cluster.getRexBuilder, + getJoinInfo, + TypeConverters.createInternalTypeFromTypeInfo(leftType), + TypeConverters.createInternalTypeFromTypeInfo(rightType)) + +if (joinType == JoinRelType.ANTI || joinType == JoinRelType.SEMI) { + throw new TableException("SEMI/ANTI Join is not supported yet.") +} + +val leftIsOuter = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL +val rightIsOuter = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL +val minRetentionTime = tableConfig.getMinIdleStateRetentionTime + +val operator = new StreamingJoinOperator( + leftType, + rightType, + generatedCondition, + leftInputSpec, + rightInputSpec, + leftIsOuter, + rightIsOuter, + filterNulls, + minRetentionTime) + +val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow]( + leftTransform, + rightTransform, + "Join", + operator, + returnType, + leftTransform.getParallelism) + +if (leftJoinKey.isEmpty) { + ret.setParallelism(1) + ret.setMaxParallelism(1) +} + +// set KeyType and Selector for state +ret.setStateKeySelectors(leftSelect, rightSelect) + ret.setStateKeyType(leftSelect.asInstanceOf[ResultTypeQueryable[_]].getProducedType) Review comment: `leftSelect.asInstanceOf[ResultTypeQueryable[_]]` useless cast This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator
JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator URL: https://github.com/apache/flink/pull/8629#discussion_r291869144 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingJoinOperator.java ## @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join.stream; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.dataformat.GenericRow; +import org.apache.flink.table.dataformat.JoinedRow; +import org.apache.flink.table.dataformat.util.BaseRowUtil; +import org.apache.flink.table.generated.GeneratedJoinCondition; +import org.apache.flink.table.generated.JoinCondition; +import org.apache.flink.table.runtime.join.NullAwareJoinHelper; +import org.apache.flink.table.runtime.join.stream.state.JoinInputSideSpec; +import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateView; +import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateViews; +import org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateView; +import org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateViews; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.util.IterableIterator; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Streaming unbounded Join operator which support INNER/LEFT/RIGHT/FULL JOIN. + */ +public class StreamingJoinOperator extends AbstractStreamOperator + implements TwoInputStreamOperator { + + private static final long serialVersionUID = -376944622236540545L; + + private final BaseRowTypeInfo leftType; + private final BaseRowTypeInfo rightType; + private final GeneratedJoinCondition generatedJoinCondition; + + private final JoinInputSideSpec leftInputSideSpec; + private final JoinInputSideSpec rightInputSideSpec; + + // whether left side is outer side, e.g. left is outer but right is not when LEFT OUTER JOIN + private final boolean leftIsOuter; + // whether right side is outer side, e.g. right is outer but left is not when RIGHT OUTER JOIN + private final boolean rightIsOuter; + + /** +* Should filter null keys. +*/ + private final int[] nullFilterKeys; + + /** +* No keys need to filter null. +*/ + private final boolean nullSafe; + + /** +* Filter null to all keys. +*/ + private final boolean filterAllNulls; + + private final long minRetentionTime; + private final boolean stateCleaningEnabled; + + private transient JoinCondition joinCondition; + private transient TimestampedCollector collector; + + private transient JoinedRow outRow; + private transient BaseRow leftNullRow; + private transient BaseRow rightNullRow; + + // left join state + private transient JoinRecordStateView leftRecordStateView; + // right join state + private transient JoinRecordStateView rightRecordStateView; + + public StreamingJoinOperator( + BaseRowTypeInfo leftType, + BaseRowTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + JoinInputSideSpec leftInputSideSpec, + JoinInputSideSpec rightInputSideSpec, + boolean leftIsOuter, + boolean rightIsOuter, + boolean[] filterNullKeys, + long minRetentionTime) { + this.leftType = leftType; + this.rightType
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291867994 ## File path: flink-runtime-web/web-dashboard/src/app/pages/submit/submit.component.html ## @@ -33,23 +33,23 @@ - - -{{jar.name}} -{{jar.uploaded | date:'-MM-dd, HH:mm:ss'}} + Review comment: `trackJarBy` -> `trackArtifactBy` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291865513 ## File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ArtifactPlanHandlerParameterTest.java ## @@ -33,19 +33,20 @@ import java.util.stream.Collectors; /** - * Tests for the parameter handling of the {@link JarPlanHandler}. + * Tests for the parameter handling of the {@link ArtifactPlanHandler}. */ -public class JarPlanHandlerParameterTest extends JarHandlerParameterTest { - private static JarPlanHandler handler; +public class ArtifactPlanHandlerParameterTest Review comment: Why we do not create a new test for `ArtifactXXX`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291859039 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractArtifactPlanHeaders.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.JobPlanInfo; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for {@link ArtifactPlanHandler}. + */ +public abstract class AbstractArtifactPlanHeaders implements MessageHeaders { + + @Override + public Class getResponseClass() { + return JobPlanInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public Class getRequestClass() { + return ArtifactPlanRequestBody.class; + } + + @Override + public ArtifactPlanMessageParameters getUnresolvedMessageParameters() { + return new ArtifactPlanMessageParameters(); + } + + @Override + public String getTargetRestEndpointURL() { + return "/artifacts/:" + ArtifactIdPathParameter.KEY + "/plan"; Review comment: Can we add a constant for `artifacts` due to there are many places using this string? such as `ArtifactDeleteHeaders`,` ArtifactListHeaders`etc. I know the old one `AbstractJarPlanHeaders` also using the `jars` string directly, but I think we can do some improvement if it makes sense to us. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291867959 ## File path: flink-runtime-web/web-dashboard/src/app/pages/submit/submit.component.ts ## @@ -154,7 +154,7 @@ export class SubmitComponent implements OnInit, OnDestroy { } constructor( -private jarService: JarService, +private artifactService: ArtifactService, Review comment: Line 152: `trackJarBy` -> `trackArtifactBy`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291868569 ## File path: flink-runtime-web/web-dashboard/src/app/services/artifact.service.ts ## @@ -26,12 +26,12 @@ import { catchError, map } from 'rxjs/operators'; @Injectable({ providedIn: 'root' }) -export class JarService { +export class ArtifactService { /** - * Get uploaded jar list + * Get uploaded artifact list */ - loadJarList() { -return this.httpClient.get(`${BASE_URL}/jars`).pipe( + loadArtifactList() { +return this.httpClient.get(`${BASE_URL}/artifacts`).pipe( Review comment: Should we rename the `JarListInterface` to `ArtifactListInterface` in `jar.js`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291865385 ## File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ArtifactPlanHandlerParameterTest.java ## @@ -94,27 +95,28 @@ JarPlanMessageParameters getWrongJarMessageParameters(ProgramArgsParType program } @Override - JarPlanRequestBody getDefaultJarRequestBody() { - return new JarPlanRequestBody(); + ArtifactPlanRequestBody getDefaultJarRequestBody() { Review comment: `getDefaultJarRequestBody` -> `getDefaultArtifactRequestBody` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291867360 ## File path: flink-runtime-web/web-dashboard/src/app/pages/submit/submit.component.html ## @@ -33,23 +33,23 @@ - - -{{jar.name}} -{{jar.uploaded | date:'-MM-dd, HH:mm:ss'}} + Review comment: How can we submit a `py` job? ![image](https://user-images.githubusercontent.com/22488084/59169831-78270f80-8b6e-11e9-9cc3-e59fa666ee02.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291867447 ## File path: flink-runtime-web/web-dashboard/src/app/pages/submit/submit.component.html ## @@ -33,23 +33,23 @@ - - -{{jar.name}} -{{jar.uploaded | date:'-MM-dd, HH:mm:ss'}} + Review comment: `listOfJar` -> `listOfArtifacts`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291868440 ## File path: flink-runtime-web/web-dashboard/src/app/pages/submit/submit.component.ts ## @@ -23,7 +23,7 @@ import { Router } from '@angular/router'; import { JarFilesItemInterface } from 'interfaces'; Review comment: should we rename the `JarFilesItemInterface` to `ArtifactFilesItemInterface` in `jar.js`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291868098 ## File path: flink-runtime-web/web-dashboard/src/app/pages/submit/submit.component.ts ## @@ -69,45 +69,45 @@ export class SubmitComponent implements OnInit, OnDestroy { } /** - * Delete jar - * @param jar + * Delete artifact + * @param artifact */ - deleteJar(jar: JarFilesItemInterface) { -this.jarService.deleteJar(jar.id).subscribe(() => { + deleteArtifact(artifact: JarFilesItemInterface) { Review comment: `JarFilesItemInterface` -> `ArtifactFilesItemInterface`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291865337 ## File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ArtifactPlanHandlerParameterTest.java ## @@ -33,19 +33,20 @@ import java.util.stream.Collectors; /** - * Tests for the parameter handling of the {@link JarPlanHandler}. + * Tests for the parameter handling of the {@link ArtifactPlanHandler}. */ -public class JarPlanHandlerParameterTest extends JarHandlerParameterTest { - private static JarPlanHandler handler; +public class ArtifactPlanHandlerParameterTest Review comment: replace all `jar` with `Artifact`? such as `getWrongJarMessageParameters` -> `getWrongArtifactMessageParameters`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291865173 ## File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ArtifactPlanHandlerParameterTest.java ## @@ -56,13 +57,13 @@ public static void setup() throws Exception { } @Override - JarPlanMessageParameters getUnresolvedJarMessageParameters() { + ArtifactPlanMessageParameters getUnresolvedJarMessageParameters() { return handler.getMessageHeaders().getUnresolvedMessageParameters(); } @Override - JarPlanMessageParameters getJarMessageParameters(ProgramArgsParType programArgsParType) { - final JarPlanMessageParameters parameters = getUnresolvedJarMessageParameters(); + ArtifactPlanMessageParameters getJarMessageParameters(ProgramArgsParType programArgsParType) { Review comment: `getJarMessageParameters` -> `getArtifactMessageParameters`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API URL: https://github.com/apache/flink/pull/8532#discussion_r291861623 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ArtifactRequestBody.java ## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobIDSerializer; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * Base class for {@link RequestBody} for running an artifact or querying the plan. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public abstract class ArtifactRequestBody implements RequestBody { + + static final String FIELD_NAME_ENTRY_CLASS = "entryClass"; + static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs"; + static final String FIELD_NAME_PROGRAM_ARGUMENTS_LIST = "programArgsList"; + static final String FIELD_NAME_PARALLELISM = "parallelism"; + static final String FIELD_NAME_DEPENDENT_ARTIFACT_ID = "dependentArtifactId"; + static final String FIELD_NAME_JOB_ID = "jobId"; + + @JsonProperty(FIELD_NAME_ENTRY_CLASS) + @Nullable + private String entryClassName; + + @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) + @Nullable + private String programArguments; + + @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) + @Nullable + private List programArgumentsList; + + @JsonProperty(FIELD_NAME_PARALLELISM) + @Nullable + private Integer parallelism; + + @JsonProperty(FIELD_NAME_DEPENDENT_ARTIFACT_ID) + @Nullable + private String dependentArtifactId; + + @JsonProperty(FIELD_NAME_JOB_ID) + @JsonDeserialize(using = JobIDDeserializer.class) + @JsonSerialize(using = JobIDSerializer.class) + @Nullable + private JobID jobId; + + ArtifactRequestBody() { + this(null, null, null, null, null, null); + } + + @JsonCreator + ArtifactRequestBody( + @Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName, + @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments, + @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) List programArgumentsList, + @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism, + @Nullable @JsonProperty(FIELD_NAME_DEPENDENT_ARTIFACT_ID) String dependentArtifactId, + @Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId) { + this.entryClassName = entryClassName; + this.programArguments = programArguments; + this.programArgumentsList = programArgumentsList; + this.parallelism = parallelism; + this.dependentArtifactId = dependentArtifactId; + this.jobId = jobId; + } + + @Nullable + @JsonIgnore + public String getEntryClassName() { + return entryClassName; + } + + @Nullable + @JsonIgnore + public String getProgramArguments() { + return programArguments; + } + + @Nullable + @JsonIgnore + public List getProgramArgumentsList() { + return programArgumentsList; + } + + @Nullable +
[GitHub] [flink] docete commented on a change in pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect
docete commented on a change in pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect URL: https://github.com/apache/flink/pull/8626#discussion_r291868992 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java ## @@ -33,7 +33,7 @@ /** * DROP TABLE DDL sql call. */ -public class SqlDropTable extends SqlDrop { +public class SqlDropTable extends SqlDrop implements ExtendedSqlType { Review comment: ExtendedSqlNode? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12765) JobMaster calculates resource needs and requests slot for the entire slot sharing group.
[ https://issues.apache.org/jira/browse/FLINK-12765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao updated FLINK-12765: Description: In this version, a task will always requests slot with its own resource need. If the resource need is less than the default slot resource, it will always be allocated to a default sized slot. The extra resources in the slot leaves chances for other tasks within the same slot sharing group to fit in. To take these chance, SlotPool will maintain available resources of each allocated slot. Available resource of an allocated slot should always be the total resource of the slot minus resources of tasks already assigned onto the slot. In this way, the SlotPool would be able to determine whether another task can fit into the slot. If a task cannot fit into the slot, for slot sharing group the SlotPool should request another slot from the ResourceManager, and for colocation group it should fail the job. > JobMaster calculates resource needs and requests slot for the entire slot > sharing group. > > > Key: FLINK-12765 > URL: https://issues.apache.org/jira/browse/FLINK-12765 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.8.0, 1.9.0 >Reporter: Tony Xintong Song >Assignee: Yun Gao >Priority: Major > Fix For: 1.9.0 > > > In this version, a task will always requests slot with its own resource need. > If the resource need is less than the default slot resource, it will always > be allocated to a default sized slot. > > The extra resources in the slot leaves chances for other tasks within the > same slot sharing group to fit in. To take these chance, SlotPool will > maintain available resources of each allocated slot. Available resource of an > allocated slot should always be the total resource of the slot minus > resources of tasks already assigned onto the slot. In this way, the SlotPool > would be able to determine whether another task can fit into the slot. If a > task cannot fit into the slot, for slot sharing group the SlotPool should > request another slot from the ResourceManager, and for colocation group it > should fail the job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Myasuka commented on a change in pull request #8571: [FLINK-12682][connectors] StringWriter support custom row delimiter
Myasuka commented on a change in pull request #8571: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8571#discussion_r291866487 ## File path: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ## @@ -82,7 +87,7 @@ public void open(FileSystem fs, Path path) throws IOException { public void write(T element) throws IOException { FSDataOutputStream outputStream = getStream(); outputStream.write(element.toString().getBytes(charset)); - outputStream.write('\n'); + outputStream.write(rowDelimiter.getBytes(charset)); Review comment: Sorry for late reply, I noticed that you have addressed all my suggestions on the new PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8563: [FLINK-12602][travis] Correct the flink pom `artifactId` config and s…
WeiZhong94 commented on a change in pull request #8563: [FLINK-12602][travis] Correct the flink pom `artifactId` config and s… URL: https://github.com/apache/flink/pull/8563#discussion_r291866343 ## File path: flink-connectors/flink-sql-connector-kafka-0.10/pom.xml ## @@ -41,6 +41,14 @@ under the License. flink-connector-kafka-0.10_${scala.binary.version} ${project.version} +
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8563: [FLINK-12602][travis] Correct the flink pom `artifactId` config and s…
WeiZhong94 commented on a change in pull request #8563: [FLINK-12602][travis] Correct the flink pom `artifactId` config and s… URL: https://github.com/apache/flink/pull/8563#discussion_r291866343 ## File path: flink-connectors/flink-sql-connector-kafka-0.10/pom.xml ## @@ -41,6 +41,14 @@ under the License. flink-connector-kafka-0.10_${scala.binary.version} ${project.version} +
[GitHub] [flink] flinkbot commented on issue #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function
flinkbot commented on issue #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function URL: https://github.com/apache/flink/pull/8669#issuecomment-500274671 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11147) Add documentation for TableAggregate Function
[ https://issues.apache.org/jira/browse/FLINK-11147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11147: --- Labels: pull-request-available (was: ) > Add documentation for TableAggregate Function > - > > Key: FLINK-11147 > URL: https://issues.apache.org/jira/browse/FLINK-11147 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Add documentation for {{TableAggregateFunction}}, similar to the document of > {{AggregateFunction}}: > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#aggregation-functions > Most parts of {{TableAggregateFunction}} would be same with > {{AggregateFunction}}, except for the ways of handling outputs. > {{AggregateFunction}} outputs a scalar value, while > {{TableAggregateFunction}} outputs a Table with multi rows and columns. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] hequn8128 opened a new pull request #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function
hequn8128 opened a new pull request #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function URL: https://github.com/apache/flink/pull/8669 ## What is the purpose of the change This pull request add documentation for TableAggregateFunction. Note: as discussed in the [PR of FLINK-12401](https://github.com/apache/flink/pull/8550#issuecomment-498913495), we will have `eimitUpdateWithRetract` and `emitUpdateWithoutRetract` two methods to improve the performance of streaming jobs. However, due to the current status that key definition on TableAggregateFunction is still under discussion now, this PR will not cover the `eimitUpdateWithRetract` method. We can open another PR to add document about `eimitUpdateWithRetract` later. This can unblock these tasks. ## Brief change log - Add documentation for TableAggregate Function - Add chinese documentation for TableAggregate Function - Update document about Aggrgate Function ## Verifying this change execute ` build_docs.sh` in local and check the changes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …
Myasuka commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics … URL: https://github.com/apache/flink/pull/8668#discussion_r291865661 ## File path: docs/monitoring/metrics.md ## @@ -658,6 +658,7 @@ Parameters: - `db` - the InfluxDB database to store metrics - `username` - (optional) InfluxDB username used for authentication - `password` - (optional) InfluxDB username's password used for authentication +- `rp` - (optional) InfluxDB retention policy, defaults to retention policy defined on the server Review comment: Updates in `metrics.md` should also be updated in `metrics-zh.md`. Moreover, `metrics.hmtl` should also be updated, please refer to `flink-docs/README.md` to generate docs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …
Myasuka commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics … URL: https://github.com/apache/flink/pull/8668#discussion_r291865744 ## File path: flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java ## @@ -57,6 +57,7 @@ private static final String TEST_INFLUXDB_DB = "test-42"; private static final String METRIC_HOSTNAME = "task-mgr-1"; private static final String METRIC_TM_ID = "tm-id-123"; + private String retentionPolicy = ""; Review comment: How about using final variable `RETENTION_POLICY` just like above tests? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator
wuchong commented on issue #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator URL: https://github.com/apache/flink/pull/8629#issuecomment-500273784 Hi @JingsongLi , I have addressed the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken commented on issue #8583: [FLINK-11820][serialization] SimpleStringSchema handle message record which value is null
lamber-ken commented on issue #8583: [FLINK-11820][serialization] SimpleStringSchema handle message record which value is null URL: https://github.com/apache/flink/pull/8583#issuecomment-500272524 Hi, @aljoscha @GJL, from `Kafka09Fetcher#runFetchLoop`, we can see that it needs to deserialize the kafka value first , and call `emitRecord` method after. ``` while (running) { // this blocks until we get the next records // it automatically re-throws exceptions encountered in the consumer thread final ConsumerRecords records = handover.pollNext(); // get the records for each topic partition for (KafkaTopicPartitionState partition : subscribedPartitionStates()) { List> partitionRecords = records.records(partition.getKafkaPartitionHandle()); for (ConsumerRecord record : partitionRecords) { final T value = deserializer.deserialize(record); if (deserializer.isEndOfStream(value)) { // end of stream signaled running = false; break; } // emit the actual record. this also updates offset state atomically // and deals with timestamps and watermark generation emitRecord(value, partition, record.offset(), record); } } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12716) Add an interactive shell for Python Table API
[ https://issues.apache.org/jira/browse/FLINK-12716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo reassigned FLINK-12716: Assignee: Huang Xingbo > Add an interactive shell for Python Table API > - > > Key: FLINK-12716 > URL: https://issues.apache.org/jira/browse/FLINK-12716 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Huang Xingbo >Priority: Major > > We should add an interactive shell for the Python Table API. It will have the > similar functionality like the Scala Shell. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 edited a comment on issue #8550: [FLINK-12401][table] Support incremental emit under AccRetract mode for non-window streaming FlatAggregate on Table API
sunjincheng121 edited a comment on issue #8550: [FLINK-12401][table] Support incremental emit under AccRetract mode for non-window streaming FlatAggregate on Table API URL: https://github.com/apache/flink/pull/8550#issuecomment-498913495 Since the execution mode of the Stream operator has two modes, `ACC` and `ACCRetract`, users can achieve better performance by implementing special interfaces for streaming. The table below is a quick summary. | emitValue | emitUpdateWithRetract | emitUpdateWithoutRetract -- | -- | -- | -- ACC | Y | N | Y ACCRetract | Y | Y | N -emitValue - for batch and streaming. -eimitUpdateWithRetract - only for streaming in ACC mode.(need key definition on TableAggregateFunction, [under discussion](https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit#heading=h.evvcpnbn30wn)). -emitUpdateWithoutRetract - only for streaming in ACCRetract mode So, In this PR, change the method name from `emitRetractValueIncrementally` to `emitUpdateWithRetract` is better. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12786) Implement local aggregation in Flink
vinoyang created FLINK-12786: Summary: Implement local aggregation in Flink Key: FLINK-12786 URL: https://issues.apache.org/jira/browse/FLINK-12786 Project: Flink Issue Type: New Feature Components: API / DataStream Reporter: vinoyang Assignee: vinoyang Currently, keyed streams are widely used to perform aggregating operations (e.g., reduce, sum and window) on the elements that have the same key. When executed at runtime, the elements with the same key will be sent to and aggregated by the same task. The performance of these aggregating operations is very sensitive to the distribution of keys. In the cases where the distribution of keys follows a powerful law, the performance will be significantly downgraded. More unluckily, increasing the degree of parallelism does not help when a task is overloaded by a single key. Local aggregation is a widely-adopted method to reduce the performance degraded by data skew. We can decompose the aggregating operations into two phases. In the first phase, we aggregate the elements of the same key at the sender side to obtain partial results. Then at the second phase, these partial results are sent to receivers according to their keys and are combined to obtain the final result. Since the number of partial results received by each receiver is limited by the number of senders, the imbalance among receivers can be reduced. Besides, by reducing the amount of transferred data the performance can be further improved. The design documentation is here: [https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing] The discussion thread is here: [http://mail-archives.apache.org/mod_mbox/flink-dev/201906.mbox/%3CCAA_=o7dvtv8zjcxknxyoyy7y_ktvgexrvb4zhxjwzuhsulz...@mail.gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory
[ https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859657#comment-16859657 ] Congxian Qiu(klion26) commented on FLINK-12785: --- Thanks for filing the issue, I will have a look at it. > RocksDB savepoint recovery can use a lot of unmanaged memory > > > Key: FLINK-12785 > URL: https://issues.apache.org/jira/browse/FLINK-12785 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Mike Kaplinskiy >Priority: Major > > I'm running an application that's backfilling data from Kafka. There's > approximately 3 years worth of data, with a lot of watermark skew (i.e. new > partitions were created over time) and I'm using daily windows. This makes a > lot of the windows buffer their contents before the watermark catches up to > "release" them. In turn, this gives me a lot of in-flight windows (200-300) > with very large state keys in rocksdb (on the order of 40-50mb per key). > Running the pipeline tends to be mostly fine - it's not terribly fast when > appends happen but everything works. The problem comes when doing a savepoint > restore - specifically, the taskmanagers eat ram until the kernel kills it > due to being out of memory. The extra memory isn't JVM heap since the memory > usage of the process is ~4x the -Xmx value and there aren't any > {{OutOfMemoryError}} exceptions. > I traced the culprit of the memory growth to > [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] > . Specifically, while the keys/values are deserialized on the Java heap, > {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which > buffers in unmanaged memory. That's not in itself an issue, but > {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not > a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will > flush only once it has 500 records, and at 40mb per key, that's at least 20Gb > of unmanaged memory before a flush. > My suggestion would be to add an additional flush criteria to > {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 > records or 5mb buffered). This way large key writes would be immediately > flushed to RocksDB on recovery or even writes. I applied this approach and I > was able to complete a savepoint restore for my job. That said, I'm not > entirely sure what else this change would impact since I'm not very familiar > with Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory
[ https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mike Kaplinskiy updated FLINK-12785: Description: I'm running an application that's backfilling data from Kafka. There's approximately 3 years worth of data, with a lot of watermark skew (i.e. new partitions were created over time) and I'm using daily windows. This makes a lot of the windows buffer their contents before the watermark catches up to "release" them. In turn, this gives me a lot of in-flight windows (200-300) with very large state keys in rocksdb (on the order of 40-50mb per key). Running the pipeline tends to be mostly fine - it's not terribly fast when appends happen but everything works. The problem comes when doing a savepoint restore - specifically, the taskmanagers eat ram until the kernel kills it due to being out of memory. The extra memory isn't JVM heap since the memory usage of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} exceptions. I traced the culprit of the memory growth to [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] . Specifically, while the keys/values are deserialized on the Java heap, {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which buffers in unmanaged memory. That's not in itself an issue, but {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will flush only once it has 500 records, and at 40mb per key, that's at least 20Gb of unmanaged memory before a flush. My suggestion would be to add an additional flush criteria to {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 records or 5mb buffered). This way large key writes would be immediately flushed to RocksDB on recovery or even writes. I applied this approach and I was able to complete a savepoint restore for my job. That said, I'm not entirely sure what else this change would impact since I'm not very familiar with Flink. was: I'm running an application that's backfilling data from Kafka. There's approximately 3 years worth of data, with a lot of watermark skew (i.e. new partitions were created over time) and I'm using daily windows. This makes a lot of the windows buffer their contents before the watermark catches up to "release" them. In turn, this gives me a lot of in-flight windows (200-300) with very large state keys in rocksdb (on the order of 40-50mb per key). Running the pipeline tends to be mostly fine - it's not terribly fast when appends happen but everything works. The problem comes when doing a savepoint restore - specifically, the taskmanagers eat ram until the kernel kills it due to being out of memory. The extra memory isn't JVM heap since the memory usage of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} exceptions. I traced the culprit of the memory growth to [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] . Specifically, while the keys/values are deserialized on the Java heap, {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which buffers in unmanaged memory. That's not in itself an issue, but {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will flush only once it has 500 records, and at 40mb per key, that's at least 20Gb of unmanaged memory before a flush. My suggestion would be to add an additional flush criteria to {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 records or 5mb buffered). This way large key writes would be immediately flushed to RocksDB on recovery or even writes. I applied this approach and I was able to complete a savepoint restore for my jon. That said, I'm not entirely sure what else this change would impact since I'm not very familiar with Flink. > RocksDB savepoint recovery can use a lot of unmanaged memory > > > Key: FLINK-12785 > URL: https://issues.apache.org/jira/browse/FLINK-12785 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Mike Kaplinskiy >Priority: Major > > I'm running an application that's backfilling data from Kafka. There's > approximately 3 years worth of data,
[jira] [Updated] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory
[ https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mike Kaplinskiy updated FLINK-12785: Description: I'm running an application that's backfilling data from Kafka. There's approximately 3 years worth of data, with a lot of watermark skew (i.e. new partitions were created over time) and I'm using daily windows. This makes a lot of the windows buffer their contents before the watermark catches up to "release" them. In turn, this gives me a lot of in-flight windows (200-300) with very large state keys in rocksdb (on the order of 40-50mb per key). Running the pipeline tends to be mostly fine - it's not terribly fast when appends happen but everything works. The problem comes when doing a savepoint restore - specifically, the taskmanagers eat ram until the kernel kills it due to being out of memory. The extra memory isn't JVM heap since the memory usage of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} exceptions. I traced the culprit of the memory growth to [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] . Specifically, while the keys/values are deserialized on the Java heap, {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which buffers in unmanaged memory. That's not in itself an issue, but {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will flush only once it has 500 records, and at 40mb per key, that's at least 20Gb of unmanaged memory before a flush. My suggestion would be to add an additional flush criteria to {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 records or 5mb buffered). This way large key writes would be immediately flushed to RocksDB on recovery or even writes. I applied this approach and I was able to complete a savepoint restore for my jon. That said, I'm not entirely sure what else this change would impact since I'm not very familiar with Flink. was: I'm running an application that's backfilling data from Kafka. There's approximately 3 years worth of data, with a lot of watermark skew (i.e. new partitions were created over time) and I'm using daily windows. This makes a lot of the windows buffer their contents before the watermark catches up to "release" them. In turn, this gives me a lot of in-flight windows (200-300) with very large state keys in rocksdb (on the order of 40-50mb per key). Running the pipeline tends to be mostly fine - it's not terribly fast when appends happen but everything works. The problem comes when doing a savepoint restore - specifically, the taskmanagers eat ram until the kernel kills it due to being out of memory. The extra memory isn't JVM heap since the memory usage of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} exceptions. I traced the culprit of the memory growth to [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] . Specifically, while the keys/values are deserialized on the Java heap, {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which buffers in unmanaged memory. That's not in itself an issue, but {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will flush only once it has 500 records, and at 40mb per key, that's at least 20Gb of managed memory before a flush. My suggestion would be to add an additional flush criteria to {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 records or 5mb buffered). This way large key writes would be immediately flushed to RocksDB on recovery or even writes. I applied this approach and I was able to complete a savepoint restore for my jon. That said, I'm not entirely sure what else this change would impact since I'm not very familiar with Flink. > RocksDB savepoint recovery can use a lot of unmanaged memory > > > Key: FLINK-12785 > URL: https://issues.apache.org/jira/browse/FLINK-12785 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Mike Kaplinskiy >Priority: Major > > I'm running an application that's backfilling data from Kafka. There's > approximately 3 years worth of data,
[jira] [Updated] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory
[ https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mike Kaplinskiy updated FLINK-12785: Description: I'm running an application that's backfilling data from Kafka. There's approximately 3 years worth of data, with a lot of watermark skew (i.e. new partitions were created over time) and I'm using daily windows. This makes a lot of the windows buffer their contents before the watermark catches up to "release" them. In turn, this gives me a lot of in-flight windows (200-300) with very large state keys in rocksdb (on the order of 40-50mb per key). Running the pipeline tends to be mostly fine - it's not terribly fast when appends happen but everything works. The problem comes when doing a savepoint restore - specifically, the taskmanagers eat ram until the kernel kills it due to being out of memory. The extra memory isn't JVM heap since the memory usage of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} exceptions. I traced the culprit of the memory growth to [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] . Specifically, while the keys/values are deserialized on the Java heap, {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which buffers in unmanaged memory. That's not in itself an issue, but {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will flush only once it has 500 records, and at 40mb per key, that's at least 20Gb of managed memory before a flush. My suggestion would be to add an additional flush criteria to {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 records or 5mb buffered). This way large key writes would be immediately flushed to RocksDB on recovery or even writes. I applied this approach and I was able to complete a savepoint restore for my jon. That said, I'm not entirely sure what else this change would impact since I'm not very familiar with Flink. was: I'm running an application that's backfilling data from Kafka. There's approximately 3 years worth of data, with a lot of watermark skew (i.e. new partitions were created over time) and I'm using daily windows. This makes a lot of the windows buffer their contents before the watermark catches up to "release" them. In turn, this gives me a lot of in-flight windows (200-300) with very large state keys in rocksdb (on the order of 40-50mb per key). Running the pipeline tends to be mostly fine - it's not terribly fast when appends happen but everything works. The problem comes when doing a savepoint restore - specifically, the taskmanagers eat ram until the kernel kills it due to being out of memory. The extra memory isn't JVM heap since the memory usage of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} exceptions. I traced the culprit of the memory growth to [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] . Specifically, while the keys/values are deserialized on the Java heap, {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which buffers in unmanaged memory. That's not in itself an issue, but {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will flush only once it has 500 records, and at 40mb per key, that's at least 20Gb of managed memory before a flush. My suggestion would be to add an additional flush criteria to {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 records or 5mb buffered). This way large key writes would be immediately flushed to RocksDB on recovery or even writes. I applied this approach and I was able to complete a savepoint restore for my jon. That said, I'm not entirely sure what else this change would impact since I'm not very familiar with Flink. > RocksDB savepoint recovery can use a lot of unmanaged memory > > > Key: FLINK-12785 > URL: https://issues.apache.org/jira/browse/FLINK-12785 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Mike Kaplinskiy >Priority: Major > > I'm running an application that's backfilling data from Kafka. There's > approximately 3 years worth of data, with
[GitHub] [flink] becketqin commented on issue #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk…
becketqin commented on issue #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk… URL: https://github.com/apache/flink/pull/8653#issuecomment-500265816 @pnowojski Will you have time to take a look? It is a small patch and hopefully won't take too much time. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory
Mike Kaplinskiy created FLINK-12785: --- Summary: RocksDB savepoint recovery can use a lot of unmanaged memory Key: FLINK-12785 URL: https://issues.apache.org/jira/browse/FLINK-12785 Project: Flink Issue Type: Bug Components: Runtime / State Backends Reporter: Mike Kaplinskiy I'm running an application that's backfilling data from Kafka. There's approximately 3 years worth of data, with a lot of watermark skew (i.e. new partitions were created over time) and I'm using daily windows. This makes a lot of the windows buffer their contents before the watermark catches up to "release" them. In turn, this gives me a lot of in-flight windows (200-300) with very large state keys in rocksdb (on the order of 40-50mb per key). Running the pipeline tends to be mostly fine - it's not terribly fast when appends happen but everything works. The problem comes when doing a savepoint restore - specifically, the taskmanagers eat ram until the kernel kills it due to being out of memory. The extra memory isn't JVM heap since the memory usage of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} exceptions. I traced the culprit of the memory growth to [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212] . Specifically, while the keys/values are deserialized on the Java heap, {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which buffers in unmanaged memory. That's not in itself an issue, but {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will flush only once it has 500 records, and at 40mb per key, that's at least 20Gb of managed memory before a flush. My suggestion would be to add an additional flush criteria to {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 records or 5mb buffered). This way large key writes would be immediately flushed to RocksDB on recovery or even writes. I applied this approach and I was able to complete a savepoint restore for my jon. That said, I'm not entirely sure what else this change would impact since I'm not very familiar with Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12776) Ambiguous content in flink-dist NOTICE file
[ https://issues.apache.org/jira/browse/FLINK-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859648#comment-16859648 ] sunjincheng commented on FLINK-12776: - We can continue to discuss it in the JIRA. > Ambiguous content in flink-dist NOTICE file > --- > > Key: FLINK-12776 > URL: https://issues.apache.org/jira/browse/FLINK-12776 > Project: Flink > Issue Type: Improvement > Components: API / Python, Release System >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.9.0 > > Attachments: image-2019-06-10-09-39-06-637.png > > > With FLINK-12409 we include the new flink-python module in flink-dist. As a > result we now have 2 {{flink-python}} entries in the flink-dist NOTICE file, > one for the old batch API and one for the newly added one, which is > ambiguous. We should rectify this by either excluding the old batch API from > flink-dist, or rename the new module to something like {{flink-api-python}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12776) Ambiguous content in flink-dist NOTICE file
[ https://issues.apache.org/jira/browse/FLINK-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859647#comment-16859647 ] sunjincheng commented on FLINK-12776: - Copy the discuss history here: !image-2019-06-10-09-39-06-637.png! > Ambiguous content in flink-dist NOTICE file > --- > > Key: FLINK-12776 > URL: https://issues.apache.org/jira/browse/FLINK-12776 > Project: Flink > Issue Type: Improvement > Components: API / Python, Release System >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.9.0 > > Attachments: image-2019-06-10-09-39-06-637.png > > > With FLINK-12409 we include the new flink-python module in flink-dist. As a > result we now have 2 {{flink-python}} entries in the flink-dist NOTICE file, > one for the old batch API and one for the newly added one, which is > ambiguous. We should rectify this by either excluding the old batch API from > flink-dist, or rename the new module to something like {{flink-api-python}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12776) Ambiguous content in flink-dist NOTICE file
[ https://issues.apache.org/jira/browse/FLINK-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12776: Attachment: image-2019-06-10-09-39-06-637.png > Ambiguous content in flink-dist NOTICE file > --- > > Key: FLINK-12776 > URL: https://issues.apache.org/jira/browse/FLINK-12776 > Project: Flink > Issue Type: Improvement > Components: API / Python, Release System >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.9.0 > > Attachments: image-2019-06-10-09-39-06-637.png > > > With FLINK-12409 we include the new flink-python module in flink-dist. As a > result we now have 2 {{flink-python}} entries in the flink-dist NOTICE file, > one for the old batch API and one for the newly added one, which is > ambiguous. We should rectify this by either excluding the old batch API from > flink-dist, or rename the new module to something like {{flink-api-python}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] becketqin commented on a change in pull request #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk…
becketqin commented on a change in pull request #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk… URL: https://github.com/apache/flink/pull/8653#discussion_r291856564 ## File path: flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties ## @@ -16,15 +16,14 @@ # limitations under the License. -log4j.rootLogger=INFO, testlogger +log4j.rootLogger=OFF, testlogger Review comment: In general, unit test should run silently without log4j logs by default. Otherwise it will pollute the screen and slow down the tests. Failures should be reported via the testing framework such as JUnit/ TestNG. If detail logs are needed for debugging, one should manually turn on the log4j logs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8623: [FLINK-12719][python] Add the Python catalog API
bowenli86 commented on a change in pull request #8623: [FLINK-12719][python] Add the Python catalog API URL: https://github.com/apache/flink/pull/8623#discussion_r291852777 ## File path: flink-python/pyflink/table/catalog.py ## @@ -0,0 +1,909 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from py4j.java_gateway import java_import + +from pyflink.java_gateway import get_gateway +from pyflink.table.table_schema import TableSchema + +__all__ = ['Catalog', 'CatalogDatabase', 'CatalogBaseTable', 'CatalogPartition', 'CatalogFunction', + 'ObjectPath', 'CatalogPartitionSpec', 'CatalogTableStatistics', + 'CatalogColumnStatistics', 'HiveCatalog', 'HiveCatalogDatabase', 'HiveCatalogFunction', + 'HiveCatalogPartition', 'HiveCatalogTable', 'HiveCatalogView'] + + +class Catalog(object): +""" +Catalog is responsible for reading and writing metadata such as database/table/views/UDFs +from a registered catalog. It connects a registered catalog and Flink's Table API. +""" + +def __init__(self, j_catalog): +self._j_catalog = j_catalog + +def get_default_database(self): +""" +Get the name of the default database for this catalog. The default database will be the +current database for the catalog when user's session doesn't specify a current database. +The value probably comes from configuration, will not change for the life time of the +catalog instance. + +:return: The name of the current database. +""" +return self._j_catalog.getDefaultDatabase() + +def list_databases(self): +""" +Get the names of all databases in this catalog. + +:return: A list of the names of all databases. +""" +return list(self._j_catalog.listDatabases()) + +def get_database(self, database_name): +""" +Get a database from this catalog. + +:param database_name: Name of the database. +:return: The requested database. +""" +return CatalogDatabase(self._j_catalog.getDatabase(database_name)) + +def database_exists(self, database_name): +""" +Check if a database exists in this catalog. + +:param database_name: Name of the database. +:return: true if the given database exists in the catalog false otherwise. +""" +return self._j_catalog.databaseExists(database_name) + +def create_database(self, name, database, ignore_if_exists): +""" +Create a database. + +:param name: Name of the database to be created. +:param database: The database definition. +:param ignore_if_exists: Flag to specify behavior when a database with the given name + already exists: + if set to false, throw a DatabaseAlreadyExistException, + if set to true, do nothing. +""" +self._j_catalog.createDatabase(name, database._j_catalog_database, ignore_if_exists) + +def drop_database(self, name, ignore_if_exists): +""" +Drop a database. + +:param name: Name of the database to be dropped. +:param ignore_if_exists: Flag to specify behavior when the database does not exist: + if set to false, throw an exception, + if set to true, do nothing. +""" +self._j_catalog.dropDatabase(name, ignore_if_exists) + +def alter_database(self, name, new_database, ignore_if_not_exists): +""" +Modify an existing database. + +:param name: Name of the database to be modified. +:param new_database: The new database definition. +:param ignore_if_not_exists: Flag to specify behavior when the given database does not + exist: + if set to false, throw an exception, + if set to true, do
[jira] [Commented] (FLINK-12620) Deadlock in task deserialization
[ https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859581#comment-16859581 ] Mike Kaplinskiy commented on FLINK-12620: - Unfortunately my use-case is a bit strange - I'm running Flink via Beam - and via a Clojure API on top of Beam at that. Attached [^BatchJob.java] is my attempt to recreate the issue - via Flink's Java API. I triggered it by making a diamond - but I'm pretty sure you can trigger this issue easier via the stream api - where there is always more than 1 subtask running. Also attached is the jstack output [^jstack_repro.txt] from trying to run the job. Let me know if you need anything else. > Deadlock in task deserialization > > > Key: FLINK-12620 > URL: https://issues.apache.org/jira/browse/FLINK-12620 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.8.0 >Reporter: Mike Kaplinskiy >Priority: Major > Attachments: BatchJob.java, jstack_repro.txt, jstack_snippet.txt > > > When running a batch job, I ran into an issue where task deserialization > caused a deadlock. Specifically, if you have a static initialization > dependency graph that looks like this (these are all classes): > {code:java} > Task1 depends on A > A depends on B > B depends on C > C depends on B [cycle] > Task2 depends on C{code} > What seems to happen is a deadlock. Specifically, threads are started on the > task managers that simultaneously call BatchTask.instantiateUserCode on both > Task1 and Task2. This starts deserializing the classes and initializing them. > Here's the deadlock scenario, as a stack: > {code:java} > Time> > T1: [deserialize] -> Task1 -> A -> B -> (wait for > C) > T2: [deserialize] -> Task2 -> C -> (wait for > B){code} > > A similar scenario from the web: > [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] . > > For my specific problem, I'm running into this within Clojure - > {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with > {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. > Deserializing different clojure functions calls one or the other first which > deadlocks task managers. > > I built a version of flink-core that had > {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} > synchronized, but I'm not sure that it's the proper fix. I'm happy to submit > that as a patch, but I'm not familiar enough with the codebase to say that > it's the correct solution - ideally all Java class loading is synchronized, > but I'm not sure how to do that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12620) Deadlock in task deserialization
[ https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mike Kaplinskiy updated FLINK-12620: Attachment: jstack_repro.txt > Deadlock in task deserialization > > > Key: FLINK-12620 > URL: https://issues.apache.org/jira/browse/FLINK-12620 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.8.0 >Reporter: Mike Kaplinskiy >Priority: Major > Attachments: BatchJob.java, jstack_repro.txt, jstack_snippet.txt > > > When running a batch job, I ran into an issue where task deserialization > caused a deadlock. Specifically, if you have a static initialization > dependency graph that looks like this (these are all classes): > {code:java} > Task1 depends on A > A depends on B > B depends on C > C depends on B [cycle] > Task2 depends on C{code} > What seems to happen is a deadlock. Specifically, threads are started on the > task managers that simultaneously call BatchTask.instantiateUserCode on both > Task1 and Task2. This starts deserializing the classes and initializing them. > Here's the deadlock scenario, as a stack: > {code:java} > Time> > T1: [deserialize] -> Task1 -> A -> B -> (wait for > C) > T2: [deserialize] -> Task2 -> C -> (wait for > B){code} > > A similar scenario from the web: > [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] . > > For my specific problem, I'm running into this within Clojure - > {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with > {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. > Deserializing different clojure functions calls one or the other first which > deadlocks task managers. > > I built a version of flink-core that had > {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} > synchronized, but I'm not sure that it's the proper fix. I'm happy to submit > that as a patch, but I'm not familiar enough with the codebase to say that > it's the correct solution - ideally all Java class loading is synchronized, > but I'm not sure how to do that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12620) Deadlock in task deserialization
[ https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mike Kaplinskiy updated FLINK-12620: Attachment: BatchJob.java > Deadlock in task deserialization > > > Key: FLINK-12620 > URL: https://issues.apache.org/jira/browse/FLINK-12620 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.8.0 >Reporter: Mike Kaplinskiy >Priority: Major > Attachments: BatchJob.java, jstack_snippet.txt > > > When running a batch job, I ran into an issue where task deserialization > caused a deadlock. Specifically, if you have a static initialization > dependency graph that looks like this (these are all classes): > {code:java} > Task1 depends on A > A depends on B > B depends on C > C depends on B [cycle] > Task2 depends on C{code} > What seems to happen is a deadlock. Specifically, threads are started on the > task managers that simultaneously call BatchTask.instantiateUserCode on both > Task1 and Task2. This starts deserializing the classes and initializing them. > Here's the deadlock scenario, as a stack: > {code:java} > Time> > T1: [deserialize] -> Task1 -> A -> B -> (wait for > C) > T2: [deserialize] -> Task2 -> C -> (wait for > B){code} > > A similar scenario from the web: > [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] . > > For my specific problem, I'm running into this within Clojure - > {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with > {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. > Deserializing different clojure functions calls one or the other first which > deadlocks task managers. > > I built a version of flink-core that had > {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} > synchronized, but I'm not sure that it's the proper fix. I'm happy to submit > that as a patch, but I'm not familiar enough with the codebase to say that > it's the correct solution - ideally all Java class loading is synchronized, > but I'm not sure how to do that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8561: [FLINK-12588][python] Add TableSchema for Python Table API.
asfgit closed pull request #8561: [FLINK-12588][python] Add TableSchema for Python Table API. URL: https://github.com/apache/flink/pull/8561 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12588) Add TableSchema for Python Table API
[ https://issues.apache.org/jira/browse/FLINK-12588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-12588. --- Resolution: Fixed Fixed in master: 8eaa2d038e9672c3a1364e1b6699b2305b8a04cb > Add TableSchema for Python Table API > > > Key: FLINK-12588 > URL: https://issues.apache.org/jira/browse/FLINK-12588 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Wei Zhong >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > As discussed in the PR of FLINK-12439, we need to add TableSchema to Python > Table API after FLINK-12408 is committed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12541) Add deploy a Python Flink job and session cluster on Kubernetes support.
[ https://issues.apache.org/jira/browse/FLINK-12541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859554#comment-16859554 ] sunjincheng commented on FLINK-12541: - Thanks for the remainder [~dian.fu] I think one JIRA only opens one PR is good to Flink. both create a new Jira for part2 or put the part2 change with multiple commits in one PR are makes sense to me. So feel free to do it which you like :) > Add deploy a Python Flink job and session cluster on Kubernetes support. > > > Key: FLINK-12541 > URL: https://issues.apache.org/jira/browse/FLINK-12541 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Runtime / REST >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Add deploy a Python Flink job and session cluster on Kubernetes support. > We need to have the same deployment step as the Java job. Please see: > [https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on issue #8623: [FLINK-12719][python] Add the Python catalog API
sunjincheng121 commented on issue #8623: [FLINK-12719][python] Add the Python catalog API URL: https://github.com/apache/flink/pull/8623#issuecomment-500240970 @flinkbot approve consensus This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8561: [FLINK-12588][python] Add TableSchema for Python Table API.
sunjincheng121 commented on issue #8561: [FLINK-12588][python] Add TableSchema for Python Table API. URL: https://github.com/apache/flink/pull/8561#issuecomment-500240339 +1 to merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk…
sunjincheng121 commented on issue #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk… URL: https://github.com/apache/flink/pull/8653#issuecomment-500239955 @flinkbot approve description This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk…
sunjincheng121 commented on a change in pull request #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk… URL: https://github.com/apache/flink/pull/8653#discussion_r291848024 ## File path: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ## @@ -656,33 +656,40 @@ public void invoke(KafkaTransactionState transaction, IN next, Context context) @Override public void close() throws FlinkKafka011Exception { - final KafkaTransactionState currentTransaction = currentTransaction(); - if (currentTransaction != null) { - // to avoid exceptions on aborting transactions with some pending records - flush(currentTransaction); - - // normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of producer reusing, thus - // we need to close it manually - switch (semantic) { - case EXACTLY_ONCE: - break; - case AT_LEAST_ONCE: - case NONE: - currentTransaction.producer.close(); - break; - } - } + // First close the producer for current transaction. try { + final KafkaTransactionState currentTransaction = currentTransaction(); + if (currentTransaction != null) { + // to avoid exceptions on aborting transactions with some pending records + flush(currentTransaction); + + // normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of producer reusing, thus + // we need to close it manually + switch (semantic) { + case EXACTLY_ONCE: + break; + case AT_LEAST_ONCE: + case NONE: + currentTransaction.producer.close(); + break; + } + } super.close(); - } - catch (Exception e) { + } catch (Exception e) { asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); + } finally { + // We may have to close producer of the current transaction in case some exception was thrown before + // the normal close routine finishes. + if (currentTransaction() != null) { + IOUtils.closeQuietly(currentTransaction().producer); + } + // Make sure all the producers for pending transactions are closed. + pendingTransactions().forEach(transaction -> + IOUtils.closeQuietly(transaction.getValue().producer) Review comment: Code format error, detail can be found [here](https://api.travis-ci.org/v3/job/542303676/log.txt): FlinkKafkaProducer011.java:[688] (regexp) RegexpSinglelineJava: Line has leading space characters; indentation should be performed with tabs only. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk…
sunjincheng121 commented on a change in pull request #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk… URL: https://github.com/apache/flink/pull/8653#discussion_r291847967 ## File path: flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties ## @@ -16,15 +16,14 @@ # limitations under the License. -log4j.rootLogger=INFO, testlogger +log4j.rootLogger=OFF, testlogger Review comment: Why we should change this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8225: [FLINK-10984]Remove flink-shaded-hadoop from flink
sunjincheng121 commented on issue #8225: [FLINK-10984]Remove flink-shaded-hadoop from flink URL: https://github.com/apache/flink/pull/8225#issuecomment-500239090 Thanks for the Review @zentol I have rebased the code and wait for CI change green. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8225: [FLINK-10984]Remove flink-shaded-hadoop from flink
sunjincheng121 commented on a change in pull request #8225: [FLINK-10984]Remove flink-shaded-hadoop from flink URL: https://github.com/apache/flink/pull/8225#discussion_r291847332 ## File path: flink-shaded-yarn-tests/pom.xml ## @@ -146,42 +172,42 @@ under the License. com.google - org.apache.flink.hadoop.shaded.com.google + org.apache.flink.hadoop2.shaded.com.google Review comment: Without this chanage, the test of `YARNHighAvailabilityITCase` cannot get success. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …
flinkbot commented on issue #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics … URL: https://github.com/apache/flink/pull/8668#issuecomment-500236796 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services