[GitHub] [flink] JingsongLi commented on a change in pull request #8435: [FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in blink

2019-06-09 Thread GitBox
JingsongLi commented on a change in pull request #8435: 
[FLINK-12443][table-planner-blink] Replace InternalType with LogicalType in 
blink
URL: https://github.com/apache/flink/pull/8435#discussion_r291888066
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
 ##
 @@ -55,4 +57,8 @@ class GenericRelDataType(
   override def hashCode(): Int = {
 genericType.hashCode()
   }
+
+  override def generateTypeString(sb: lang.StringBuilder, withDetail: 
Boolean): Unit = {
 
 Review comment:
   Now we delete `ArrayRelDataType` and use `ArraySqlType` and etc...
   NOTE: `equals` of `ArraySqlType` is:
   ```
   @Override public boolean equals(Object obj) {
   if (obj instanceof RelDataTypeImpl) {
 final RelDataTypeImpl that = (RelDataTypeImpl) obj;
 return this.digest.equals(that.digest);
   }
   return false;
 }
   ```
   So we must need override `generateTypeString` all calcite extended type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect

2019-06-09 Thread GitBox
docete commented on a change in pull request #8626: [FLINK-12742] Add insert 
into partition grammar as hive dialect
URL: https://github.com/apache/flink/pull/8626#discussion_r291887668
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.validate;
+
+import org.apache.calcite.sql.validate.SqlConformance;
+
+/** Sql conformance used for flink to set specific sql dialect parser. **/
+public enum FlinkSqlConformance implements SqlConformance {
+   /** Calcite's default SQL behavior. */
+   DEFAULT,
+
+   /** Conformance value that instructs Calcite to use SQL semantics
+* consistent with the Apache HIVE, but ignoring its more
+* inconvenient or controversial dicta. */
 
 Review comment:
   "dicta" typo of "dialect" ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #8673: blink first commit

2019-06-09 Thread GitBox
KurtYoung commented on issue #8673: blink first commit
URL: https://github.com/apache/flink/pull/8673#issuecomment-500300991
 
 
   @waywtd Please don't open this pull request


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8673: blink first commit

2019-06-09 Thread GitBox
flinkbot commented on issue #8673: blink first commit
URL: https://github.com/apache/flink/pull/8673#issuecomment-500300770
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] waywtd commented on issue #8673: blink first commit

2019-06-09 Thread GitBox
waywtd commented on issue #8673: blink first commit
URL: https://github.com/apache/flink/pull/8673#issuecomment-500300630
 
 
   123


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8672: blink first commit

2019-06-09 Thread GitBox
flinkbot commented on issue #8672: blink first commit
URL: https://github.com/apache/flink/pull/8672#issuecomment-500300573
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] waywtd opened a new pull request #8673: blink first commit

2019-06-09 Thread GitBox
waywtd opened a new pull request #8673: blink first commit
URL: https://github.com/apache/flink/pull/8673
 
 
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung closed pull request #8672: blink first commit

2019-06-09 Thread GitBox
KurtYoung closed pull request #8672: blink first commit
URL: https://github.com/apache/flink/pull/8672
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12541) Add deploy a Python Flink job and session cluster on Kubernetes support.

2019-06-09 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859718#comment-16859718
 ] 

Dian Fu commented on FLINK-12541:
-

[~sunjincheng121] Thanks a lot for the suggestions. I have create a ticket 
FLINK-12788 for the part2 changes.

> Add deploy a Python Flink job and session cluster on Kubernetes support.
> 
>
> Key: FLINK-12541
> URL: https://issues.apache.org/jira/browse/FLINK-12541
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Runtime / REST
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add deploy a Python Flink job and session cluster on Kubernetes support.
> We need to have the same deployment step as the Java job. Please see: 
> [https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12789) Fix java docs in UserDefinedAggregateFunction

2019-06-09 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-12789:
---

 Summary: Fix java docs in UserDefinedAggregateFunction
 Key: FLINK-12789
 URL: https://issues.apache.org/jira/browse/FLINK-12789
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Hequn Cheng
Assignee: Hequn Cheng


We use \{{UserDefinedAggregateFunction}} as the base class for 
\{{TableAggregateFunction}} and \{{AggregateFunction}}. However, the java docs 
in \{{UserDefinedAggregateFunction}} are only dedicated for 
\{{AggregateFunction}}. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] waywtd opened a new pull request #8672: blink first commit

2019-06-09 Thread GitBox
waywtd opened a new pull request #8672: blink first commit
URL: https://github.com/apache/flink/pull/8672
 
 
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs

2019-06-09 Thread GitBox
HuangXingBo commented on a change in pull request #8671: [FLINK-12787][python] 
Allow to specify directory in option -pyfs
URL: https://github.com/apache/flink/pull/8671#discussion_r291886091
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
 ##
 @@ -119,9 +119,9 @@ protected ProgramOptions(CommandLine line) throws 
CliArgsException {
// PythonDriver args: pym ${py-module} pyfs ${py-files} 
[optional] ${other args}.
// e.g. -pym AAA.fun -pyfs AAA.zip(CLI cmd) > pym 
AAA.fun -pyfs AAA.zip(PythonDriver args)
String[] newArgs = new String[args.length + 4];
-   newArgs[0] = PYMODULE_OPTION.getOpt();
+   newArgs[0] = "-" + PYMODULE_OPTION.getOpt();
newArgs[1] = 
line.getOptionValue(PYMODULE_OPTION.getOpt());
-   newArgs[2] = PYFILES_OPTION.getOpt();
+   newArgs[2] = "-" + PYFILES_OPTION.getOpt();
 
 Review comment:
   The transform note will be changed together or the note can be delete


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs

2019-06-09 Thread GitBox
HuangXingBo commented on a change in pull request #8671: [FLINK-12787][python] 
Allow to specify directory in option -pyfs
URL: https://github.com/apache/flink/pull/8671#discussion_r291886132
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/client/PythonDriverOptionsParserFactory.java
 ##
 @@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.client;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Parser factory which generates a {@link PythonDriverOptions} from a given
+ * list of command line arguments.
+ */
+final class PythonDriverOptionsParserFactory implements 
ParserResultFactory {
+
+   private static final Option PY_OPTION = Option.builder("py")
+   .longOpt("python")
+   .required(false)
+   .hasArg(true)
+   .argName("entrypoint python file")
+   .desc("Python script with the program entry point. " +
+   "The dependent resources can be configured with the 
`--pyFiles` option.")
+   .build();
+
+   private static final Option PYMODULE_OPTION = Option.builder("pym")
+   .longOpt("pyModule")
+   .required(false)
+   .hasArg(true)
+   .argName("entrypoint module name")
+   .desc("Python module with the program entry point. " +
+   "This option must be used in conjunction with 
`--pyFiles`.")
+   .build();
+
+   private static final Option PYFILES_OPTION = Option.builder("pyfs")
+   .longOpt("pyFiles")
+   .required(false)
+   .hasArg(true)
+   .argName("entrypoint python file")
+   .desc("Attach custom python files for job. " +
+   "Comma can be used as the separator to specify multiple 
files. " +
+   "The standard python resource file suffixes such as 
.py/.egg/.zip are all supported." +
+   "(eg: --pyFiles 
file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)")
+   .build();
+
+   @Override
+   public Options getOptions() {
+   final Options options = new Options();
+   options.addOption(PY_OPTION);
+   options.addOption(PYMODULE_OPTION);
+   options.addOption(PYFILES_OPTION);
+   return options;
+   }
+
+   @Override
+   public PythonDriverOptions createResult(@Nonnull CommandLine 
commandLine) throws FlinkParseException {
+   String entrypointModule = null;
+   final List pythonLibFiles = new ArrayList<>();
+
+   if (commandLine.hasOption(PY_OPTION.getOpt()) && 
commandLine.hasOption(PYMODULE_OPTION.getOpt())) {
+   throw new FlinkParseException("Cannot use options -py 
and -pym simultaneously.");
 
 Review comment:
   The simmilar logic has existed in ProgramOptions. Whether the similar code 
in ProgramOptions could be detelte. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on a change in pull request #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs

2019-06-09 Thread GitBox
HuangXingBo commented on a change in pull request #8671: [FLINK-12787][python] 
Allow to specify directory in option -pyfs
URL: https://github.com/apache/flink/pull/8671#discussion_r291886055
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
 ##
 @@ -95,14 +95,14 @@ protected ProgramOptions(CommandLine line) throws 
CliArgsException {
int argIndex;
if (line.hasOption(PYFILES_OPTION.getOpt())) {
newArgs = new String[args.length + 4];
-   newArgs[2] = PYFILES_OPTION.getOpt();
+   newArgs[2] = "-" + PYFILES_OPTION.getOpt();
 
 Review comment:
   The transform note will be changed together or the note can be delete


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect

2019-06-09 Thread GitBox
docete commented on a change in pull request #8626: [FLINK-12742] Add insert 
into partition grammar as hive dialect
URL: https://github.com/apache/flink/pull/8626#discussion_r291885792
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java
 ##
 @@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dml;
+
+import org.apache.flink.sql.parser.SqlProperty;
+import org.apache.flink.sql.parser.ddl.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlParseException;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.sql.validate.SqlValidatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Pair;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.calcite.util.Static.RESOURCE;
+
+/** An {@link SqlInsert} that have some extension functions like partition, 
overwrite. **/
 
 Review comment:
   Seems we did not support INSERT OVERWRITE in the former Parser.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script

2019-06-09 Thread GitBox
dianfu commented on issue #8609: [FLINK-12541][container][python] Add support 
for Python jobs in build script
URL: https://github.com/apache/flink/pull/8609#issuecomment-500295981
 
 
   @tillrohrmann Thanks a lot for your review. 
   Your suggestion makes much sense to me. I have created a dedicated JIRA 
[FLINK-12788](https://issues.apache.org/jira/browse/FLINK-12788) for this PR. 
Regarding to the changes to `StandaloneJobClusterEntrypoint`, agree that there 
should not be special logic for Python. I will revert that part of changes.
   
   @sunjincheng121 Thanks a lot for your review. I have created a separate JIRA 
[FLINK-12787](https://issues.apache.org/jira/browse/FLINK-12787) for the 
PythonDriver improving. Then we can focus this PR on the build script changes 
for Python jobs support. 
   
   Will updated the PR later today.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12788) Add support to run a Python job-specific cluster on Kubernetes

2019-06-09 Thread Dian Fu (JIRA)
Dian Fu created FLINK-12788:
---

 Summary: Add support to run a Python job-specific cluster on 
Kubernetes
 Key: FLINK-12788
 URL: https://issues.apache.org/jira/browse/FLINK-12788
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python, Deployment / Docker
Reporter: Dian Fu
Assignee: Dian Fu


As discussed in FLINK-12541, we need to support to run a Python job-specific 
cluster on Kubernetes. To support this, we need to improve the job specific 
docker image build scripts to support Python Table API jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs

2019-06-09 Thread GitBox
flinkbot commented on issue #8671: [FLINK-12787][python] Allow to specify 
directory in option -pyfs
URL: https://github.com/apache/flink/pull/8671#issuecomment-500293007
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu opened a new pull request #8671: [FLINK-12787][python] Allow to specify directory in option -pyfs

2019-06-09 Thread GitBox
dianfu opened a new pull request #8671: [FLINK-12787][python] Allow to specify 
directory in option -pyfs
URL: https://github.com/apache/flink/pull/8671
 
 
   
   ## What is the purpose of the change
   
   *This pull request add support to allow to specify directory in option 
`-pyfs`*
   
   ## Brief change log
   
 - *Add support to allow to specify directory in option `-pyfs`*
 - *Improve PythonDriver to use CommandLineParser for the arguments parsing*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Existing tests in PythonDriverTest*
 - *Added tests in PythonDriverOptionsParserFactoryTest and 
PythonEnvUtilsTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12787) Allow to specify directory in option -pyfs

2019-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12787:
---
Labels: pull-request-available  (was: )

> Allow to specify directory in option -pyfs
> --
>
> Key: FLINK-12787
> URL: https://issues.apache.org/jira/browse/FLINK-12787
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Current only files can be specified in option `-pyfs`, we want to improve it 
> allow also specify directories in option `-pyfs`. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12787) Allow to specify directory in option -pyfs

2019-06-09 Thread Dian Fu (JIRA)
Dian Fu created FLINK-12787:
---

 Summary: Allow to specify directory in option -pyfs
 Key: FLINK-12787
 URL: https://issues.apache.org/jira/browse/FLINK-12787
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu


Current only files can be specified in option `-pyfs`, we want to improve it 
allow also specify directories in option `-pyfs`. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 removed a comment on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script

2019-06-09 Thread GitBox
sunjincheng121 removed a comment on issue #8609: 
[FLINK-12541][container][python] Add support for Python jobs in build script
URL: https://github.com/apache/flink/pull/8609#issuecomment-500290745
 
 
   Thanks for the PR @dianfu 
   I think the suggestion from @tillrohrmann makes more sense to me. we should 
let the PR have its own JIRA.
   furthermore,  the improvement of `pyfs` also can in a new PR (with a new 
JIRA).
   BTW: please rebase the code, and will have another review :)
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script

2019-06-09 Thread GitBox
sunjincheng121 commented on issue #8609: [FLINK-12541][container][python] Add 
support for Python jobs in build script
URL: https://github.com/apache/flink/pull/8609#issuecomment-500290745
 
 
   Thanks for the PR @dianfu 
   I think the suggestion from @tillrohrmann makes more sense to me. we should 
let the PR have its own JIRA.
   furthermore,  the improvement of `pyfs` also can in a new PR (with a new 
JIRA).
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 edited a comment on issue #8609: [FLINK-12541][container][python] Add support for Python jobs in build script

2019-06-09 Thread GitBox
sunjincheng121 edited a comment on issue #8609: 
[FLINK-12541][container][python] Add support for Python jobs in build script
URL: https://github.com/apache/flink/pull/8609#issuecomment-500290745
 
 
   Thanks for the PR @dianfu 
   I think the suggestion from @tillrohrmann makes more sense to me. we should 
let the PR have its own JIRA.
   furthermore,  the improvement of `pyfs` also can in a new PR (with a new 
JIRA).
   BTW: please rebase the code, and will have another review :)
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on issue #8641: [FLINK-12757][python] Improves the word_count example to use the descriptor API

2019-06-09 Thread GitBox
dianfu commented on issue #8641: [FLINK-12757][python] Improves the word_count 
example to use the descriptor API
URL: https://github.com/apache/flink/pull/8641#issuecomment-500289996
 
 
   @sunjincheng121 Thanks a lot for the review. Updated accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8670: [hotfix][utils] Replace implemented FutureUtils#toJava with lib method

2019-06-09 Thread GitBox
flinkbot commented on issue #8670: [hotfix][utils] Replace implemented 
FutureUtils#toJava with lib method
URL: https://github.com/apache/flink/pull/8670#issuecomment-500289795
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun opened a new pull request #8670: [hotfix][utils] Replace implemented FutureUtils#toJava with lib method

2019-06-09 Thread GitBox
TisonKun opened a new pull request #8670: [hotfix][utils] Replace implemented 
FutureUtils#toJava with lib method
URL: https://github.com/apache/flink/pull/8670
 
 
   ## What is the purpose of the change
   
   Replace implemented `FutureUtils#toJava` with scala's lib method.
   
   ## Brief change log
   
   Replace `FutureUtils#toJava` with 
`scala.compat.java8.FutureConverters#toJava`.
   
   For the unchecked upcast, ref 
https://github.com/scala/scala-java8-compat/pull/152 but it should be ok in 
this case.
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   
   cc @zentol @GJL 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8641: [FLINK-12757][python] Improves the word_count example to use the descriptor API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8641: 
[FLINK-12757][python] Improves the word_count example to use the descriptor API
URL: https://github.com/apache/flink/pull/8641#discussion_r291877199
 
 

 ##
 File path: flink-python/pyflink/table/examples/batch/word_count.py
 ##
 @@ -39,41 +34,40 @@ def word_count():
   "License you may not use this file except in compliance " \
   "with the License"
 
-with open(source_path, 'w') as f:
-for word in content.split(" "):
-f.write(",".join([word, "1"]))
-f.write("\n")
-f.flush()
-f.close()
-
 t_config = TableConfig.Builder().as_batch_execution().build()
 t_env = TableEnvironment.create(t_config)
 
-field_names = ["word", "cout"]
-field_types = [DataTypes.STRING(), DataTypes.BIGINT()]
-
-# register Orders table in table environment
-t_env.register_table_source(
-"Word",
-CsvTableSource(source_path, field_names, field_types))
-
 # register Results table in table environment
 tmp_dir = tempfile.gettempdir()
-tmp_csv = tmp_dir + '/streaming2.csv'
-if os.path.isfile(tmp_csv):
-os.remove(tmp_csv)
+result_dir = tmp_dir + '/result'
+if os.path.exists(result_dir):
+try:
+shutil.rmtree(result_dir)
 
 Review comment:
   we should check the file by follows code:
   ```
   if os.path.isfile(result_dir):
   os.remove(result_dir)
   else:
   shutil.rmtree(result_dir)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-09 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r291876341
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -47,14 +70,33 @@
 * @throws RuntimeException if the Params doesn't contains the specific 
parameter, while the
 *  param is not optional but has no default 
value in the {@code info}
 */
-   @SuppressWarnings("unchecked")
public  V get(ParamInfo info) {
-   V value = (V) paramMap.getOrDefault(info.getName(), 
info.getDefaultValue());
-   if (value == null && !info.isOptional() && 
!info.hasDefaultValue()) {
-   throw new RuntimeException(info.getName() +
-   " not exist which is not optional and don't 
have a default value");
+   Stream stream = getParamNameAndAlias(info)
+   .filter(this.params::containsKey)
+   .map(x -> this.params.get(x))
+   .map(x -> valueFromJson(x, info.getValueClass()))
+   .limit(1);
 
 Review comment:
   If not use stream object, we need more codes. 
   
   For the value of parameter may be null, and the findFirst method is not 
friendly for the null, thus we chose the limit(.) method.
   
   When the user sets a Null as the Value of Param, using the findFirst method 
in the Stream class will throw NullPointerException.
   Looking deep into the code, findFirst returns an instance of Optional,
   but Optional can only use non-null value as a constructor argument.
   For example, the following code will throw NullPointerException.
   
   Stream.of(null, "2").findFirst();
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-09 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r291874460
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -93,59 +126,105 @@
 * @param   the type of the specific parameter
 */
public  void remove(ParamInfo info) {
-   paramMap.remove(info.getName());
+   params.remove(info.getName());
+   for (String a : info.getAlias()) {
+   params.remove(a);
+   }
}
 
-   /**
-* Creates and returns a deep clone of this Params.
-*
-* @return a deep clone of this Params
-*/
-   public Params clone() {
-   Params newParams = new Params();
-   newParams.paramMap.putAll(this.paramMap);
-   return newParams;
+   public  boolean contains(ParamInfo paramInfo) {
+   return params.containsKey(paramInfo.getName()) ||
+   
Arrays.stream(paramInfo.getAlias()).anyMatch(params::containsKey);
}
 
/**
-* Returns a json containing all parameters in this Params. The json 
should be human-readable if
-* possible.
+* Creates and returns a deep clone of this Params.
 *
 * @return a json containing all parameters in this Params
 */
public String toJson() {
-   ObjectMapper mapper = new ObjectMapper();
-   Map stringMap = new HashMap<>();
try {
-   for (Map.Entry e : paramMap.entrySet()) 
{
-   stringMap.put(e.getKey(), 
mapper.writeValueAsString(e.getValue()));
-   }
-   return mapper.writeValueAsString(stringMap);
+   return mapper.writeValueAsString(params);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize params 
to json", e);
}
}
 
/**
 * Restores the parameters from the given json. The parameters should 
be exactly the same with
-* the one who was serialized to the input json after the restoration. 
The class mapping of the
-* parameters in the json is required because it is hard to directly 
restore a param of a user
-* defined type. Params will be treated as String if it doesn't exist 
in the {@code classMap}.
+* the one who was serialized to the input json after the restoration.
 *
-* @param json the json String to restore from
-* @param classMap the classes of the parameters contained in the json
+* @param json the json String to restore from
 */
@SuppressWarnings("unchecked")
-   public void loadJson(String json, Map> classMap) {
+   public void loadJson(String json) {
ObjectMapper mapper = new ObjectMapper();
+   Map params;
try {
-   Map m = mapper.readValue(json, 
Map.class);
-   for (Map.Entry e : m.entrySet()) {
-   Class valueClass = 
classMap.getOrDefault(e.getKey(), String.class);
-   paramMap.put(e.getKey(), 
mapper.readValue(e.getValue(), valueClass));
+   params = mapper.readValue(json, Map.class);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
json:" + json, e);
+   }
+   this.params.clear();
+   this.params.putAll(params);
+   }
+
+   public static Params fromJson(String json) {
+   Params params = new Params();
+   params.loadJson(json);
+   return params;
+   }
+
+   public Params merge(Params otherParams) {
+   if (otherParams != null) {
+   this.params.putAll(otherParams.params);
+   }
+   return this;
+   }
+
+   @Override
+   public Params clone() {
+   Params newParams = new Params();
+   newParams.params.putAll(this.params);
+   return newParams;
+   }
+
+   private void assertMapperInited() {
+   if (mapper == null) {
+   mapper = new ObjectMapper();
+   }
+   }
+
+   private String valueToJson(Object value) {
+   assertMapperInited();
+   try {
+   if (value == null) {
+   return null;
+   }
+   return mapper.writeValueAsString(value);
+   } catch (JsonProcessingException e) {
+   throw new RuntimeException("Failed to serialize to 
json:" + value, e);
+ 

[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-09 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r291874378
 
 

 ##
 File path: 
flink-ml-parent/flink-ml/src/main/java/org/apache/flink/ml/params/shared/colname/HasCategoricalColNames.java
 ##
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.params.shared.colname;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
+import org.apache.flink.ml.params.BaseWithParam;
+
+/**
+ * Names of the categorical columns used for training in the input table.
+ */
+public interface HasCategoricalColNames extends BaseWithParam  {
+
+   ParamInfo  CATEGORICAL_COL_NAMES = ParamInfoFactory
+   .createParamInfo("categoricalColNames", String[].class)
+   .setDescription("Names of the categorical columns used for 
training in the input table")
+   .setHasDefaultValue(new String[] {})
 
 Review comment:
   I removed this default value, it is not necessary and maybe confused for 
user.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-09 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r291873818
 
 

 ##
 File path: 
flink-ml-parent/flink-ml/src/main/java/org/apache/flink/ml/params/shared/HasVectorSizeDv100.java
 ##
 @@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.params.shared;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
+import org.apache.flink.ml.params.BaseWithParam;
+
+/**
+ * Vector size of embedding.
+ */
+public interface HasVectorSizeDv100 extends BaseWithParam  {
+   /**
+* @cn embedding的向量长度
 
 Review comment:
   Thanks, removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-09 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r291873656
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -47,14 +70,33 @@
 * @throws RuntimeException if the Params doesn't contains the specific 
parameter, while the
 *  param is not optional but has no default 
value in the {@code info}
 */
-   @SuppressWarnings("unchecked")
public  V get(ParamInfo info) {
-   V value = (V) paramMap.getOrDefault(info.getName(), 
info.getDefaultValue());
-   if (value == null && !info.isOptional() && 
!info.hasDefaultValue()) {
-   throw new RuntimeException(info.getName() +
-   " not exist which is not optional and don't 
have a default value");
+   Stream stream = getParamNameAndAlias(info)
 
 Review comment:
   I changed the name to "paramValue". 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on issue #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-09 Thread GitBox
xuyang1706 commented on issue #8632: [FLINK-12744][ml] add shared params in ml 
package
URL: https://github.com/apache/flink/pull/8632#issuecomment-500284364
 
 
   > Hi, @xuyang1706 thanks for your work!
   > I left a few comments to your changes, could you look into when you will 
have a chance.
   > Thanks!
   
   Hi @ex00, sorry for the late reply, I just came back from my vacation. 
   Thanks for your helpful comments! I have updated the code and reply them 
inlines.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8616: [FLINK-12718][hive] allow users to specify hive-site.xml location to configure hive metastore client in HiveCatalog

2019-06-09 Thread GitBox
lirui-apache commented on a change in pull request #8616: [FLINK-12718][hive] 
allow users to specify hive-site.xml location to configure hive metastore 
client in HiveCatalog
URL: https://github.com/apache/flink/pull/8616#discussion_r291872326
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -109,27 +114,51 @@
 
private HiveMetastoreClientWrapper client;
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
+   public HiveCatalog(String catalogName, @Nullable String 
defaultDatabase, @Nullable String hiveSiteFilePath) {
+   this(catalogName,
+   defaultDatabase == null ? DEFAULT_DB : defaultDatabase,
+   getHiveConf(hiveSiteFilePath));
}
 
-   public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   this(catalogName, DEFAULT_DB, hiveConf);
+   public HiveCatalog(String catalogName, @Nullable String 
defaultDatabase, @Nullable URL hiveSiteUrl) {
+   this(catalogName,
+   defaultDatabase == null ? DEFAULT_DB : defaultDatabase,
+   getHiveConf(hiveSiteUrl));
}
 
-   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
-   super(catalogName, defaultDatabase);
-   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
+   public HiveCatalog(String catalogName, @Nullable String 
defaultDatabase, @Nullable HiveConf hiveConf) {
+   super(catalogName, defaultDatabase == null ? DEFAULT_DB : 
defaultDatabase);
+
+   this.hiveConf = hiveConf == null ? getHiveConf("") : hiveConf;
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
-   private static HiveConf getHiveConf(String hiveMetastoreURI) {
-   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+   private static HiveConf getHiveConf(String filePath) {
+
+   URL url = null;
+
+   if (!StringUtils.isNullOrWhitespaceOnly(filePath)) {
+   try {
+   url = new File(filePath).toURI().toURL();
+
+   LOG.info("Successfully loaded '{}'", filePath);
+
+   } catch (MalformedURLException e) {
+   throw new CatalogException(
+   String.format("Failed to get 
hive-site.xml from the given path '%s'", filePath), e);
+   }
+   }
+
+   return getHiveConf(url);
+   }
+
+   private static HiveConf getHiveConf(URL hiveSiteUrl) {
+   LOG.info("Setting hive-site location as {}", hiveSiteUrl);
+
+   HiveConf.setHiveSiteLocation(hiveSiteUrl);
 
 Review comment:
   Wondering will this incur any concurrency issue, e.g. when user access 
tables from different Hive catalogs? I think at least some constructors of 
HiveConf is no longer safe to use after this change, like `HiveConf()` and 
`HiveConf(Class cls)` which automatically load the specified hive-site.xml.
   I think it's better not to rely on static fields of HiveConf. Instead, we 
can load the hive-site (perhaps `Properties::loadFromXML`?) ourselves and 
`HiveConf::hiveSiteURL` should always be set to null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12765) Bookkeeping of available resources of allocated slots in SlotPool.

2019-06-09 Thread Tony Xintong Song (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tony Xintong Song updated FLINK-12765:
--
Summary: Bookkeeping of available resources of allocated slots in SlotPool. 
 (was: JobMaster calculates resource needs and requests slot for the entire 
slot sharing group.)

> Bookkeeping of available resources of allocated slots in SlotPool.
> --
>
> Key: FLINK-12765
> URL: https://issues.apache.org/jira/browse/FLINK-12765
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Tony Xintong Song
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.9.0
>
>
> In this version, a task will always requests slot with its own resource need. 
> If the resource need is less than the default slot resource, it will always 
> be allocated to a default sized slot. 
>  
> The extra resources in the slot leaves chances for other tasks within the 
> same slot sharing group to fit in. To take these chance, SlotPool will 
> maintain available resources of each allocated slot. Available resource of an 
> allocated slot should always be the total resource of the slot minus 
> resources of tasks already assigned onto the slot. In this way, the SlotPool 
> would be able to determine whether another task can fit into the slot. If a 
> task cannot fit into the slot, for slot sharing group the SlotPool should 
> request another slot from the ResourceManager, and for colocation group it 
> should fail the job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12744) ML common parameters

2019-06-09 Thread Xu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xu Yang updated FLINK-12744:

Description: 
We defined some common-used parameters for machine-learning algorithms.

 

- *add ML common parameters*
 - *this is sub pr of #8586*
 - *change behavior when use default constructor of param factory*
 - *add shared params in ml package*
 - *add flink-ml module*

  was:We defined some common-used parameters for machine-learning algorithms.


> ML common parameters
> 
>
> Key: FLINK-12744
> URL: https://issues.apache.org/jira/browse/FLINK-12744
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xu Yang
>Assignee: Xu Yang
>Priority: Major
>
> We defined some common-used parameters for machine-learning algorithms.
>  
> - *add ML common parameters*
>  - *this is sub pr of #8586*
>  - *change behavior when use default constructor of param factory*
>  - *add shared params in ml package*
>  - *add flink-ml module*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12744) ML common parameters

2019-06-09 Thread Xu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xu Yang updated FLINK-12744:

Description: 
We defined some common-used parameters for machine-learning algorithms.
 - *add ML common parameters*
 - *this is sub pr of #8586*
 - *change behavior when use default constructor of param factory*
 - *add shared params in ml package*
 - *add flink-ml module*

  was:
We defined some common-used parameters for machine-learning algorithms.

 

- *add ML common parameters*
 - *this is sub pr of #8586*
 - *change behavior when use default constructor of param factory*
 - *add shared params in ml package*
 - *add flink-ml module*


> ML common parameters
> 
>
> Key: FLINK-12744
> URL: https://issues.apache.org/jira/browse/FLINK-12744
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xu Yang
>Assignee: Xu Yang
>Priority: Major
>
> We defined some common-used parameters for machine-learning algorithms.
>  - *add ML common parameters*
>  - *this is sub pr of #8586*
>  - *change behavior when use default constructor of param factory*
>  - *add shared params in ml package*
>  - *add flink-ml module*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12744) ML common parameters

2019-06-09 Thread Xu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xu Yang updated FLINK-12744:

Description: 
We defined some common-used parameters for machine-learning algorithms.
 - *add ML common parameters*
 - *change behavior when use default constructor of param factory*
 - *add shared params in ml package*
 - *add flink-ml module*

  was:
We defined some common-used parameters for machine-learning algorithms.
 - *add ML common parameters*
 - *this is sub pr of #8586*
 - *change behavior when use default constructor of param factory*
 - *add shared params in ml package*
 - *add flink-ml module*


> ML common parameters
> 
>
> Key: FLINK-12744
> URL: https://issues.apache.org/jira/browse/FLINK-12744
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xu Yang
>Assignee: Xu Yang
>Priority: Major
>
> We defined some common-used parameters for machine-learning algorithms.
>  - *add ML common parameters*
>  - *change behavior when use default constructor of param factory*
>  - *add shared params in ml package*
>  - *add flink-ml module*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on issue #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function

2019-06-09 Thread GitBox
sunjincheng121 commented on issue #8669: [FLINK-11147][table][docs] Add 
documentation for TableAggregate Function
URL: https://github.com/apache/flink/pull/8669#issuecomment-500279807
 
 
   @flinkbot approve description


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator

2019-06-09 Thread GitBox
JingsongLi commented on a change in pull request #8629: 
[FLINK-12088][table-runtime-blink] Introduce unbounded streaming 
inner/left/right/full join operator
URL: https://github.com/apache/flink/pull/8629#discussion_r291869732
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingJoinOperator.java
 ##
 @@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.stream;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.runtime.join.NullAwareJoinHelper;
+import org.apache.flink.table.runtime.join.stream.state.JoinInputSideSpec;
+import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateView;
+import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateViews;
+import 
org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateView;
+import 
org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateViews;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.IterableIterator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Streaming unbounded Join operator which support INNER/LEFT/RIGHT/FULL JOIN.
+ */
+public class StreamingJoinOperator extends AbstractStreamOperator
+   implements TwoInputStreamOperator {
+
+   private static final long serialVersionUID = -376944622236540545L;
+
+   private final BaseRowTypeInfo leftType;
+   private final BaseRowTypeInfo rightType;
+   private final GeneratedJoinCondition generatedJoinCondition;
+
+   private final JoinInputSideSpec leftInputSideSpec;
+   private final JoinInputSideSpec rightInputSideSpec;
+
+   // whether left side is outer side, e.g. left is outer but right is not 
when LEFT OUTER JOIN
+   private final boolean leftIsOuter;
+   // whether right side is outer side, e.g. right is outer but left is 
not when RIGHT OUTER JOIN
+   private final boolean rightIsOuter;
+
+   /**
+* Should filter null keys.
+*/
+   private final int[] nullFilterKeys;
+
+   /**
+* No keys need to filter null.
+*/
+   private final boolean nullSafe;
+
+   /**
+* Filter null to all keys.
+*/
+   private final boolean filterAllNulls;
+
+   private final long minRetentionTime;
+   private final boolean stateCleaningEnabled;
+
+   private transient JoinCondition joinCondition;
+   private transient TimestampedCollector collector;
+
+   private transient JoinedRow outRow;
+   private transient BaseRow leftNullRow;
+   private transient BaseRow rightNullRow;
+
+   // left join state
+   private transient JoinRecordStateView leftRecordStateView;
+   // right join state
+   private transient JoinRecordStateView rightRecordStateView;
+
+   public StreamingJoinOperator(
+   BaseRowTypeInfo leftType,
+   BaseRowTypeInfo rightType,
+   GeneratedJoinCondition generatedJoinCondition,
+   JoinInputSideSpec leftInputSideSpec,
+   JoinInputSideSpec rightInputSideSpec,
+   boolean leftIsOuter,
+   boolean rightIsOuter,
+   boolean[] filterNullKeys,
+   long minRetentionTime) {
+   this.leftType = leftType;
+   this.rightType 

[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator

2019-06-09 Thread GitBox
JingsongLi commented on a change in pull request #8629: 
[FLINK-12088][table-runtime-blink] Introduce unbounded streaming 
inner/left/right/full join operator
URL: https://github.com/apache/flink/pull/8629#discussion_r291867027
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala
 ##
 @@ -104,6 +110,14 @@ class StreamExecJoin(
 new StreamExecJoin(cluster, traitSet, left, right, conditionExpr, joinType)
   }
 
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super
+  .explainTerms(pw)
+  .item("leftInputSpec", analyzeJoinInput(left))
 
 Review comment:
   Does `JoinInputSideSpec` useful to `explainTerms`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator

2019-06-09 Thread GitBox
JingsongLi commented on a change in pull request #8629: 
[FLINK-12088][table-runtime-blink] Introduce unbounded streaming 
inner/left/right/full join operator
URL: https://github.com/apache/flink/pull/8629#discussion_r291867330
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala
 ##
 @@ -123,7 +137,109 @@ class StreamExecJoin(
 
   override protected def translateToPlanInternal(
   tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
-throw new TableException("Implements this")
+
+val tableConfig = tableEnv.getConfig
+val returnType = FlinkTypeFactory.toInternalRowType(getRowType).toTypeInfo
+
+val leftTransform = getInputNodes.get(0).translateToPlan(tableEnv)
+  .asInstanceOf[StreamTransformation[BaseRow]]
+val rightTransform = getInputNodes.get(1).translateToPlan(tableEnv)
+  .asInstanceOf[StreamTransformation[BaseRow]]
+
+val leftType = leftTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+val rightType = rightTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+
+val (leftJoinKey, rightJoinKey) =
+  JoinUtil.checkAndGetJoinKeys(keyPairs, getLeft, getRight, allowEmptyKey 
= true)
+
+val leftSelect = KeySelectorUtil.getBaseRowSelector(leftJoinKey, leftType)
+val rightSelect = KeySelectorUtil.getBaseRowSelector(rightJoinKey, 
rightType)
+
+val leftInputSpec = analyzeJoinInput(left)
+val rightInputSpec = analyzeJoinInput(right)
+
+val generatedCondition = JoinUtil.generateConditionFunction(
+  tableConfig,
+  cluster.getRexBuilder,
+  getJoinInfo,
+  TypeConverters.createInternalTypeFromTypeInfo(leftType),
+  TypeConverters.createInternalTypeFromTypeInfo(rightType))
+
+if (joinType == JoinRelType.ANTI || joinType == JoinRelType.SEMI) {
+  throw new TableException("SEMI/ANTI Join is not supported yet.")
+}
+
+val leftIsOuter = joinType == JoinRelType.LEFT || joinType == 
JoinRelType.FULL
+val rightIsOuter = joinType == JoinRelType.RIGHT || joinType == 
JoinRelType.FULL
+val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
+
+val operator = new StreamingJoinOperator(
+  leftType,
+  rightType,
+  generatedCondition,
+  leftInputSpec,
+  rightInputSpec,
+  leftIsOuter,
+  rightIsOuter,
+  filterNulls,
+  minRetentionTime)
+
+val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
+  leftTransform,
+  rightTransform,
+  "Join",
 
 Review comment:
   Transformation name so simple? Should add some condition message like batch:
   ```
 private def getOperatorName: String = if (getCondition != null) {
   val inFields = inputRowType.getFieldNames.toList
   s"SortMergeJoin(where: ${
 getExpressionString(getCondition, inFields, None, 
ExpressionFormat.Infix)})"
 } else {
   "SortMergeJoin"
 }
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator

2019-06-09 Thread GitBox
JingsongLi commented on a change in pull request #8629: 
[FLINK-12088][table-runtime-blink] Introduce unbounded streaming 
inner/left/right/full join operator
URL: https://github.com/apache/flink/pull/8629#discussion_r291866569
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala
 ##
 @@ -104,6 +110,14 @@ class StreamExecJoin(
 new StreamExecJoin(cluster, traitSet, left, right, conditionExpr, joinType)
   }
 
+
 
 Review comment:
   remove empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator

2019-06-09 Thread GitBox
JingsongLi commented on a change in pull request #8629: 
[FLINK-12088][table-runtime-blink] Introduce unbounded streaming 
inner/left/right/full join operator
URL: https://github.com/apache/flink/pull/8629#discussion_r291867580
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala
 ##
 @@ -123,7 +137,109 @@ class StreamExecJoin(
 
   override protected def translateToPlanInternal(
   tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
-throw new TableException("Implements this")
+
+val tableConfig = tableEnv.getConfig
+val returnType = FlinkTypeFactory.toInternalRowType(getRowType).toTypeInfo
+
+val leftTransform = getInputNodes.get(0).translateToPlan(tableEnv)
+  .asInstanceOf[StreamTransformation[BaseRow]]
+val rightTransform = getInputNodes.get(1).translateToPlan(tableEnv)
+  .asInstanceOf[StreamTransformation[BaseRow]]
+
+val leftType = leftTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+val rightType = rightTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+
+val (leftJoinKey, rightJoinKey) =
+  JoinUtil.checkAndGetJoinKeys(keyPairs, getLeft, getRight, allowEmptyKey 
= true)
+
+val leftSelect = KeySelectorUtil.getBaseRowSelector(leftJoinKey, leftType)
+val rightSelect = KeySelectorUtil.getBaseRowSelector(rightJoinKey, 
rightType)
+
+val leftInputSpec = analyzeJoinInput(left)
+val rightInputSpec = analyzeJoinInput(right)
+
+val generatedCondition = JoinUtil.generateConditionFunction(
+  tableConfig,
+  cluster.getRexBuilder,
+  getJoinInfo,
+  TypeConverters.createInternalTypeFromTypeInfo(leftType),
+  TypeConverters.createInternalTypeFromTypeInfo(rightType))
+
+if (joinType == JoinRelType.ANTI || joinType == JoinRelType.SEMI) {
+  throw new TableException("SEMI/ANTI Join is not supported yet.")
+}
+
+val leftIsOuter = joinType == JoinRelType.LEFT || joinType == 
JoinRelType.FULL
+val rightIsOuter = joinType == JoinRelType.RIGHT || joinType == 
JoinRelType.FULL
+val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
+
+val operator = new StreamingJoinOperator(
+  leftType,
+  rightType,
+  generatedCondition,
+  leftInputSpec,
+  rightInputSpec,
+  leftIsOuter,
+  rightIsOuter,
+  filterNulls,
+  minRetentionTime)
+
+val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
+  leftTransform,
+  rightTransform,
+  "Join",
+  operator,
+  returnType,
+  leftTransform.getParallelism)
+
+if (leftJoinKey.isEmpty) {
+  ret.setParallelism(1)
+  ret.setMaxParallelism(1)
+}
+
+// set KeyType and Selector for state
+ret.setStateKeySelectors(leftSelect, rightSelect)
+
ret.setStateKeyType(leftSelect.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
+ret
   }
 
+  private def analyzeJoinInput(input: RelNode): JoinInputSideSpec = {
 
 Review comment:
   Some similar to `needsUpdatesAsRetraction`, need to extract a method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator

2019-06-09 Thread GitBox
JingsongLi commented on a change in pull request #8629: 
[FLINK-12088][table-runtime-blink] Introduce unbounded streaming 
inner/left/right/full join operator
URL: https://github.com/apache/flink/pull/8629#discussion_r291867419
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala
 ##
 @@ -123,7 +137,109 @@ class StreamExecJoin(
 
   override protected def translateToPlanInternal(
   tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
-throw new TableException("Implements this")
+
+val tableConfig = tableEnv.getConfig
+val returnType = FlinkTypeFactory.toInternalRowType(getRowType).toTypeInfo
+
+val leftTransform = getInputNodes.get(0).translateToPlan(tableEnv)
+  .asInstanceOf[StreamTransformation[BaseRow]]
+val rightTransform = getInputNodes.get(1).translateToPlan(tableEnv)
+  .asInstanceOf[StreamTransformation[BaseRow]]
+
+val leftType = leftTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+val rightType = rightTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+
+val (leftJoinKey, rightJoinKey) =
+  JoinUtil.checkAndGetJoinKeys(keyPairs, getLeft, getRight, allowEmptyKey 
= true)
+
+val leftSelect = KeySelectorUtil.getBaseRowSelector(leftJoinKey, leftType)
+val rightSelect = KeySelectorUtil.getBaseRowSelector(rightJoinKey, 
rightType)
+
+val leftInputSpec = analyzeJoinInput(left)
+val rightInputSpec = analyzeJoinInput(right)
+
+val generatedCondition = JoinUtil.generateConditionFunction(
+  tableConfig,
+  cluster.getRexBuilder,
+  getJoinInfo,
+  TypeConverters.createInternalTypeFromTypeInfo(leftType),
+  TypeConverters.createInternalTypeFromTypeInfo(rightType))
+
+if (joinType == JoinRelType.ANTI || joinType == JoinRelType.SEMI) {
+  throw new TableException("SEMI/ANTI Join is not supported yet.")
+}
+
+val leftIsOuter = joinType == JoinRelType.LEFT || joinType == 
JoinRelType.FULL
+val rightIsOuter = joinType == JoinRelType.RIGHT || joinType == 
JoinRelType.FULL
+val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
+
+val operator = new StreamingJoinOperator(
+  leftType,
+  rightType,
+  generatedCondition,
+  leftInputSpec,
+  rightInputSpec,
+  leftIsOuter,
+  rightIsOuter,
+  filterNulls,
+  minRetentionTime)
+
+val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
+  leftTransform,
+  rightTransform,
+  "Join",
+  operator,
+  returnType,
+  leftTransform.getParallelism)
+
+if (leftJoinKey.isEmpty) {
+  ret.setParallelism(1)
+  ret.setMaxParallelism(1)
+}
+
+// set KeyType and Selector for state
+ret.setStateKeySelectors(leftSelect, rightSelect)
+
ret.setStateKeyType(leftSelect.asInstanceOf[ResultTypeQueryable[_]].getProducedType)
 
 Review comment:
   `leftSelect.asInstanceOf[ResultTypeQueryable[_]]` useless cast


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator

2019-06-09 Thread GitBox
JingsongLi commented on a change in pull request #8629: 
[FLINK-12088][table-runtime-blink] Introduce unbounded streaming 
inner/left/right/full join operator
URL: https://github.com/apache/flink/pull/8629#discussion_r291869144
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingJoinOperator.java
 ##
 @@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.stream;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.runtime.join.NullAwareJoinHelper;
+import org.apache.flink.table.runtime.join.stream.state.JoinInputSideSpec;
+import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateView;
+import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateViews;
+import 
org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateView;
+import 
org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateViews;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.IterableIterator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Streaming unbounded Join operator which support INNER/LEFT/RIGHT/FULL JOIN.
+ */
+public class StreamingJoinOperator extends AbstractStreamOperator
+   implements TwoInputStreamOperator {
+
+   private static final long serialVersionUID = -376944622236540545L;
+
+   private final BaseRowTypeInfo leftType;
+   private final BaseRowTypeInfo rightType;
+   private final GeneratedJoinCondition generatedJoinCondition;
+
+   private final JoinInputSideSpec leftInputSideSpec;
+   private final JoinInputSideSpec rightInputSideSpec;
+
+   // whether left side is outer side, e.g. left is outer but right is not 
when LEFT OUTER JOIN
+   private final boolean leftIsOuter;
+   // whether right side is outer side, e.g. right is outer but left is 
not when RIGHT OUTER JOIN
+   private final boolean rightIsOuter;
+
+   /**
+* Should filter null keys.
+*/
+   private final int[] nullFilterKeys;
+
+   /**
+* No keys need to filter null.
+*/
+   private final boolean nullSafe;
+
+   /**
+* Filter null to all keys.
+*/
+   private final boolean filterAllNulls;
+
+   private final long minRetentionTime;
+   private final boolean stateCleaningEnabled;
+
+   private transient JoinCondition joinCondition;
+   private transient TimestampedCollector collector;
+
+   private transient JoinedRow outRow;
+   private transient BaseRow leftNullRow;
+   private transient BaseRow rightNullRow;
+
+   // left join state
+   private transient JoinRecordStateView leftRecordStateView;
+   // right join state
+   private transient JoinRecordStateView rightRecordStateView;
+
+   public StreamingJoinOperator(
+   BaseRowTypeInfo leftType,
+   BaseRowTypeInfo rightType,
+   GeneratedJoinCondition generatedJoinCondition,
+   JoinInputSideSpec leftInputSideSpec,
+   JoinInputSideSpec rightInputSideSpec,
+   boolean leftIsOuter,
+   boolean rightIsOuter,
+   boolean[] filterNullKeys,
+   long minRetentionTime) {
+   this.leftType = leftType;
+   this.rightType 

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291867994
 
 

 ##
 File path: 
flink-runtime-web/web-dashboard/src/app/pages/submit/submit.component.html
 ##
 @@ -33,23 +33,23 @@
 
   
   
-
-  
-{{jar.name}}
-{{jar.uploaded | date:'-MM-dd, HH:mm:ss'}}
+
 
 Review comment:
   `trackJarBy` -> `trackArtifactBy`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291865513
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ArtifactPlanHandlerParameterTest.java
 ##
 @@ -33,19 +33,20 @@
 import java.util.stream.Collectors;
 
 /**
- * Tests for the parameter handling of the {@link JarPlanHandler}.
+ * Tests for the parameter handling of the {@link ArtifactPlanHandler}.
  */
-public class JarPlanHandlerParameterTest extends 
JarHandlerParameterTest {
-   private static JarPlanHandler handler;
+public class ArtifactPlanHandlerParameterTest
 
 Review comment:
   Why we do not create a new test for `ArtifactXXX`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291859039
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractArtifactPlanHeaders.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for {@link ArtifactPlanHandler}.
+ */
+public abstract class AbstractArtifactPlanHeaders implements 
MessageHeaders {
+
+   @Override
+   public Class getResponseClass() {
+   return JobPlanInfo.class;
+   }
+
+   @Override
+   public HttpResponseStatus getResponseStatusCode() {
+   return HttpResponseStatus.OK;
+   }
+
+   @Override
+   public Class getRequestClass() {
+   return ArtifactPlanRequestBody.class;
+   }
+
+   @Override
+   public ArtifactPlanMessageParameters getUnresolvedMessageParameters() {
+   return new ArtifactPlanMessageParameters();
+   }
+
+   @Override
+   public String getTargetRestEndpointURL() {
+   return "/artifacts/:" + ArtifactIdPathParameter.KEY + "/plan";
 
 Review comment:
   Can we add a constant for `artifacts` due to there are many places using 
this string? such as `ArtifactDeleteHeaders`,` ArtifactListHeaders`etc.
   I know the old one `AbstractJarPlanHeaders` also using the `jars` string 
directly, but I 
   think we can do some improvement if it makes sense to us. What do you think?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291867959
 
 

 ##
 File path: 
flink-runtime-web/web-dashboard/src/app/pages/submit/submit.component.ts
 ##
 @@ -154,7 +154,7 @@ export class SubmitComponent implements OnInit, OnDestroy {
   }
 
   constructor(
-private jarService: JarService,
+private artifactService: ArtifactService,
 
 Review comment:
   Line 152:  `trackJarBy` -> `trackArtifactBy`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291868569
 
 

 ##
 File path: flink-runtime-web/web-dashboard/src/app/services/artifact.service.ts
 ##
 @@ -26,12 +26,12 @@ import { catchError, map } from 'rxjs/operators';
 @Injectable({
   providedIn: 'root'
 })
-export class JarService {
+export class ArtifactService {
   /**
-   * Get uploaded jar list
+   * Get uploaded artifact list
*/
-  loadJarList() {
-return this.httpClient.get(`${BASE_URL}/jars`).pipe(
+  loadArtifactList() {
+return this.httpClient.get(`${BASE_URL}/artifacts`).pipe(
 
 Review comment:
   Should we rename the `JarListInterface` to `ArtifactListInterface` in 
`jar.js`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291865385
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ArtifactPlanHandlerParameterTest.java
 ##
 @@ -94,27 +95,28 @@ JarPlanMessageParameters 
getWrongJarMessageParameters(ProgramArgsParType program
}
 
@Override
-   JarPlanRequestBody getDefaultJarRequestBody() {
-   return new JarPlanRequestBody();
+   ArtifactPlanRequestBody getDefaultJarRequestBody() {
 
 Review comment:
   `getDefaultJarRequestBody` -> `getDefaultArtifactRequestBody`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291867360
 
 

 ##
 File path: 
flink-runtime-web/web-dashboard/src/app/pages/submit/submit.component.html
 ##
 @@ -33,23 +33,23 @@
 
   
   
-
-  
-{{jar.name}}
-{{jar.uploaded | date:'-MM-dd, HH:mm:ss'}}
+
 
 Review comment:
   How can we submit a `py` job?
   
![image](https://user-images.githubusercontent.com/22488084/59169831-78270f80-8b6e-11e9-9cc3-e59fa666ee02.png)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291867447
 
 

 ##
 File path: 
flink-runtime-web/web-dashboard/src/app/pages/submit/submit.component.html
 ##
 @@ -33,23 +33,23 @@
 
   
   
-
-  
-{{jar.name}}
-{{jar.uploaded | date:'-MM-dd, HH:mm:ss'}}
+
 
 Review comment:
   `listOfJar` -> `listOfArtifacts`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291868440
 
 

 ##
 File path: 
flink-runtime-web/web-dashboard/src/app/pages/submit/submit.component.ts
 ##
 @@ -23,7 +23,7 @@ import { Router } from '@angular/router';
 import { JarFilesItemInterface } from 'interfaces';
 
 Review comment:
   should we rename the `JarFilesItemInterface` to `ArtifactFilesItemInterface` 
in `jar.js`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291868098
 
 

 ##
 File path: 
flink-runtime-web/web-dashboard/src/app/pages/submit/submit.component.ts
 ##
 @@ -69,45 +69,45 @@ export class SubmitComponent implements OnInit, OnDestroy {
   }
 
   /**
-   * Delete jar
-   * @param jar
+   * Delete artifact
+   * @param artifact
*/
-  deleteJar(jar: JarFilesItemInterface) {
-this.jarService.deleteJar(jar.id).subscribe(() => {
+  deleteArtifact(artifact: JarFilesItemInterface) {
 
 Review comment:
   `JarFilesItemInterface` -> `ArtifactFilesItemInterface`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291865337
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ArtifactPlanHandlerParameterTest.java
 ##
 @@ -33,19 +33,20 @@
 import java.util.stream.Collectors;
 
 /**
- * Tests for the parameter handling of the {@link JarPlanHandler}.
+ * Tests for the parameter handling of the {@link ArtifactPlanHandler}.
  */
-public class JarPlanHandlerParameterTest extends 
JarHandlerParameterTest {
-   private static JarPlanHandler handler;
+public class ArtifactPlanHandlerParameterTest
 
 Review comment:
   replace all `jar` with `Artifact`? such as `getWrongJarMessageParameters` -> 
`getWrongArtifactMessageParameters`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291865173
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ArtifactPlanHandlerParameterTest.java
 ##
 @@ -56,13 +57,13 @@ public static void setup() throws Exception {
}
 
@Override
-   JarPlanMessageParameters getUnresolvedJarMessageParameters() {
+   ArtifactPlanMessageParameters getUnresolvedJarMessageParameters() {
return 
handler.getMessageHeaders().getUnresolvedMessageParameters();
}
 
@Override
-   JarPlanMessageParameters getJarMessageParameters(ProgramArgsParType 
programArgsParType) {
-   final JarPlanMessageParameters parameters = 
getUnresolvedJarMessageParameters();
+   ArtifactPlanMessageParameters 
getJarMessageParameters(ProgramArgsParType programArgsParType) {
 
 Review comment:
   `getJarMessageParameters` -> `getArtifactMessageParameters`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] Support to submit Python Table API jobs via REST API

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8532: [FLINK-12541][REST] 
Support to submit Python Table API jobs via REST API
URL: https://github.com/apache/flink/pull/8532#discussion_r291861623
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ArtifactRequestBody.java
 ##
 @@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Base class for {@link RequestBody} for running an artifact or querying the 
plan.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public abstract class ArtifactRequestBody implements RequestBody {
+
+   static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
+   static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
+   static final String FIELD_NAME_PROGRAM_ARGUMENTS_LIST = 
"programArgsList";
+   static final String FIELD_NAME_PARALLELISM = "parallelism";
+   static final String FIELD_NAME_DEPENDENT_ARTIFACT_ID = 
"dependentArtifactId";
+   static final String FIELD_NAME_JOB_ID = "jobId";
+
+   @JsonProperty(FIELD_NAME_ENTRY_CLASS)
+   @Nullable
+   private String entryClassName;
+
+   @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
+   @Nullable
+   private String programArguments;
+
+   @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST)
+   @Nullable
+   private List programArgumentsList;
+
+   @JsonProperty(FIELD_NAME_PARALLELISM)
+   @Nullable
+   private Integer parallelism;
+
+   @JsonProperty(FIELD_NAME_DEPENDENT_ARTIFACT_ID)
+   @Nullable
+   private String dependentArtifactId;
+
+   @JsonProperty(FIELD_NAME_JOB_ID)
+   @JsonDeserialize(using = JobIDDeserializer.class)
+   @JsonSerialize(using = JobIDSerializer.class)
+   @Nullable
+   private JobID jobId;
+
+   ArtifactRequestBody() {
+   this(null, null, null, null, null, null);
+   }
+
+   @JsonCreator
+   ArtifactRequestBody(
+   @Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String 
entryClassName,
+   @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String 
programArguments,
+   @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) 
List programArgumentsList,
+   @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer 
parallelism,
+   @Nullable @JsonProperty(FIELD_NAME_DEPENDENT_ARTIFACT_ID) 
String dependentArtifactId,
+   @Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId) {
+   this.entryClassName = entryClassName;
+   this.programArguments = programArguments;
+   this.programArgumentsList = programArgumentsList;
+   this.parallelism = parallelism;
+   this.dependentArtifactId = dependentArtifactId;
+   this.jobId = jobId;
+   }
+
+   @Nullable
+   @JsonIgnore
+   public String getEntryClassName() {
+   return entryClassName;
+   }
+
+   @Nullable
+   @JsonIgnore
+   public String getProgramArguments() {
+   return programArguments;
+   }
+
+   @Nullable
+   @JsonIgnore
+   public List getProgramArgumentsList() {
+   return programArgumentsList;
+   }
+
+   @Nullable
+ 

[GitHub] [flink] docete commented on a change in pull request #8626: [FLINK-12742] Add insert into partition grammar as hive dialect

2019-06-09 Thread GitBox
docete commented on a change in pull request #8626: [FLINK-12742] Add insert 
into partition grammar as hive dialect
URL: https://github.com/apache/flink/pull/8626#discussion_r291868992
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java
 ##
 @@ -33,7 +33,7 @@
 /**
  * DROP TABLE DDL sql call.
  */
-public class SqlDropTable extends SqlDrop {
+public class SqlDropTable extends SqlDrop implements ExtendedSqlType {
 
 Review comment:
   ExtendedSqlNode?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12765) JobMaster calculates resource needs and requests slot for the entire slot sharing group.

2019-06-09 Thread Yun Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao updated FLINK-12765:

Description: 
In this version, a task will always requests slot with its own resource need. 
If the resource need is less than the default slot resource, it will always be 
allocated to a default sized slot. 

 

The extra resources in the slot leaves chances for other tasks within the same 
slot sharing group to fit in. To take these chance, SlotPool will maintain 
available resources of each allocated slot. Available resource of an allocated 
slot should always be the total resource of the slot minus resources of tasks 
already assigned onto the slot. In this way, the SlotPool would be able to 
determine whether another task can fit into the slot. If a task cannot fit into 
the slot, for slot sharing group the SlotPool should request another slot from 
the ResourceManager, and for colocation group it should fail the job.

> JobMaster calculates resource needs and requests slot for the entire slot 
> sharing group.
> 
>
> Key: FLINK-12765
> URL: https://issues.apache.org/jira/browse/FLINK-12765
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.8.0, 1.9.0
>Reporter: Tony Xintong Song
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.9.0
>
>
> In this version, a task will always requests slot with its own resource need. 
> If the resource need is less than the default slot resource, it will always 
> be allocated to a default sized slot. 
>  
> The extra resources in the slot leaves chances for other tasks within the 
> same slot sharing group to fit in. To take these chance, SlotPool will 
> maintain available resources of each allocated slot. Available resource of an 
> allocated slot should always be the total resource of the slot minus 
> resources of tasks already assigned onto the slot. In this way, the SlotPool 
> would be able to determine whether another task can fit into the slot. If a 
> task cannot fit into the slot, for slot sharing group the SlotPool should 
> request another slot from the ResourceManager, and for colocation group it 
> should fail the job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Myasuka commented on a change in pull request #8571: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-06-09 Thread GitBox
Myasuka commented on a change in pull request #8571: [FLINK-12682][connectors] 
StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8571#discussion_r291866487
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -82,7 +87,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
public void write(T element) throws IOException {
FSDataOutputStream outputStream = getStream();
outputStream.write(element.toString().getBytes(charset));
-   outputStream.write('\n');
+   outputStream.write(rowDelimiter.getBytes(charset));
 
 Review comment:
   Sorry for late reply, I noticed that you have addressed all my suggestions 
on the new PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8563: [FLINK-12602][travis] Correct the flink pom `artifactId` config and s…

2019-06-09 Thread GitBox
WeiZhong94 commented on a change in pull request #8563: [FLINK-12602][travis] 
Correct the flink pom `artifactId` config and s…
URL: https://github.com/apache/flink/pull/8563#discussion_r291866343
 
 

 ##
 File path: flink-connectors/flink-sql-connector-kafka-0.10/pom.xml
 ##
 @@ -41,6 +41,14 @@ under the License.

flink-connector-kafka-0.10_${scala.binary.version}
${project.version}

+   

[GitHub] [flink] WeiZhong94 commented on a change in pull request #8563: [FLINK-12602][travis] Correct the flink pom `artifactId` config and s…

2019-06-09 Thread GitBox
WeiZhong94 commented on a change in pull request #8563: [FLINK-12602][travis] 
Correct the flink pom `artifactId` config and s…
URL: https://github.com/apache/flink/pull/8563#discussion_r291866343
 
 

 ##
 File path: flink-connectors/flink-sql-connector-kafka-0.10/pom.xml
 ##
 @@ -41,6 +41,14 @@ under the License.

flink-connector-kafka-0.10_${scala.binary.version}
${project.version}

+   

[GitHub] [flink] flinkbot commented on issue #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function

2019-06-09 Thread GitBox
flinkbot commented on issue #8669: [FLINK-11147][table][docs] Add documentation 
for TableAggregate Function
URL: https://github.com/apache/flink/pull/8669#issuecomment-500274671
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11147) Add documentation for TableAggregate Function

2019-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11147:
---
Labels: pull-request-available  (was: )

> Add documentation for TableAggregate Function
> -
>
> Key: FLINK-11147
> URL: https://issues.apache.org/jira/browse/FLINK-11147
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Add documentation for {{TableAggregateFunction}}, similar to the document of 
> {{AggregateFunction}}: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#aggregation-functions
> Most parts of {{TableAggregateFunction}} would be same with 
> {{AggregateFunction}}, except for the ways of handling outputs. 
> {{AggregateFunction}} outputs a scalar value, while 
> {{TableAggregateFunction}} outputs a Table with multi rows and columns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] hequn8128 opened a new pull request #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function

2019-06-09 Thread GitBox
hequn8128 opened a new pull request #8669: [FLINK-11147][table][docs] Add 
documentation for TableAggregate Function
URL: https://github.com/apache/flink/pull/8669
 
 
   
   ## What is the purpose of the change
   
   This pull request add documentation for TableAggregateFunction. 
   
   Note: as discussed in the [PR of 
FLINK-12401](https://github.com/apache/flink/pull/8550#issuecomment-498913495), 
we will have `eimitUpdateWithRetract` and `emitUpdateWithoutRetract` two 
methods to improve the performance of streaming jobs. However, due to the 
current status that key definition on TableAggregateFunction is still under 
discussion now, this PR will not cover the `eimitUpdateWithRetract` method. We 
can open another PR to add document about `eimitUpdateWithRetract` later. This 
can unblock these tasks.
   
   
   ## Brief change log
   
 - Add documentation for TableAggregate Function
 - Add chinese documentation for TableAggregate Function
 - Update document about Aggrgate Function
   
   
   ## Verifying this change
   
   execute ` build_docs.sh` in local and check the changes. 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …

2019-06-09 Thread GitBox
Myasuka commented on a change in pull request #8668: [FLINK-12784][metrics] 
Support retention policy for InfluxDB metrics …
URL: https://github.com/apache/flink/pull/8668#discussion_r291865661
 
 

 ##
 File path: docs/monitoring/metrics.md
 ##
 @@ -658,6 +658,7 @@ Parameters:
 - `db` - the InfluxDB database to store metrics
 - `username` - (optional) InfluxDB username used for authentication
 - `password` - (optional) InfluxDB username's password used for authentication
+- `rp` - (optional) InfluxDB retention policy, defaults to retention policy 
defined on the server
 
 Review comment:
   Updates in `metrics.md` should also be updated in `metrics-zh.md`. Moreover, 
`metrics.hmtl` should also be updated, please refer to `flink-docs/README.md` 
to generate docs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …

2019-06-09 Thread GitBox
Myasuka commented on a change in pull request #8668: [FLINK-12784][metrics] 
Support retention policy for InfluxDB metrics …
URL: https://github.com/apache/flink/pull/8668#discussion_r291865744
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java
 ##
 @@ -57,6 +57,7 @@
private static final String TEST_INFLUXDB_DB = "test-42";
private static final String METRIC_HOSTNAME = "task-mgr-1";
private static final String METRIC_TM_ID = "tm-id-123";
+   private String retentionPolicy = "";
 
 Review comment:
   How about using final variable `RETENTION_POLICY` just like above tests?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #8629: [FLINK-12088][table-runtime-blink] Introduce unbounded streaming inner/left/right/full join operator

2019-06-09 Thread GitBox
wuchong commented on issue #8629: [FLINK-12088][table-runtime-blink] Introduce 
unbounded streaming inner/left/right/full join operator
URL: https://github.com/apache/flink/pull/8629#issuecomment-500273784
 
 
   Hi @JingsongLi , I have addressed the comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lamber-ken commented on issue #8583: [FLINK-11820][serialization] SimpleStringSchema handle message record which value is null

2019-06-09 Thread GitBox
lamber-ken commented on issue #8583: [FLINK-11820][serialization] 
SimpleStringSchema handle message record which value is null
URL: https://github.com/apache/flink/pull/8583#issuecomment-500272524
 
 
   Hi, @aljoscha @GJL, from `Kafka09Fetcher#runFetchLoop`, we can see that it 
needs to deserialize the kafka value first , and call `emitRecord` method after.
   ```
   while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the consumer 
thread
final ConsumerRecords records = handover.pollNext();
   
// get the records for each topic partition
for (KafkaTopicPartitionState partition : 
subscribedPartitionStates()) {
   
List> partitionRecords =

records.records(partition.getKafkaPartitionHandle());
   
for (ConsumerRecord record : partitionRecords) {
   
final T value = deserializer.deserialize(record);
   
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
running = false;
break;
}
   
// emit the actual record. this also updates offset 
state atomically
// and deals with timestamps and watermark generation
emitRecord(value, partition, record.offset(), record);
}
}
   }
   ```
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12716) Add an interactive shell for Python Table API

2019-06-09 Thread Huang Xingbo (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo reassigned FLINK-12716:


Assignee: Huang Xingbo

> Add an interactive shell for Python Table API
> -
>
> Key: FLINK-12716
> URL: https://issues.apache.org/jira/browse/FLINK-12716
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Huang Xingbo
>Priority: Major
>
> We should add an interactive shell for the Python Table API. It will have the 
> similar functionality like the Scala Shell.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 edited a comment on issue #8550: [FLINK-12401][table] Support incremental emit under AccRetract mode for non-window streaming FlatAggregate on Table API

2019-06-09 Thread GitBox
sunjincheng121 edited a comment on issue #8550: [FLINK-12401][table] Support 
incremental emit under AccRetract mode for non-window streaming FlatAggregate 
on Table API
URL: https://github.com/apache/flink/pull/8550#issuecomment-498913495
 
 
   Since the execution mode of the Stream operator has two modes, `ACC` and 
`ACCRetract`, users can achieve better performance by implementing special 
interfaces for streaming. The table below is a quick summary.
   
     | emitValue | emitUpdateWithRetract | emitUpdateWithoutRetract
   -- | -- | -- | --
   ACC | Y | N | Y
   ACCRetract | Y | Y | N
   
   -emitValue - for batch and streaming.
   -eimitUpdateWithRetract - only for streaming in ACC mode.(need key 
definition on TableAggregateFunction, [under 
discussion](https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit#heading=h.evvcpnbn30wn)).
   -emitUpdateWithoutRetract - only for streaming in ACCRetract mode
   
   So, In this PR, change the method name from `emitRetractValueIncrementally` 
to `emitUpdateWithRetract` is better. What do you think?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12786) Implement local aggregation in Flink

2019-06-09 Thread vinoyang (JIRA)
vinoyang created FLINK-12786:


 Summary: Implement local aggregation in Flink
 Key: FLINK-12786
 URL: https://issues.apache.org/jira/browse/FLINK-12786
 Project: Flink
  Issue Type: New Feature
  Components: API / DataStream
Reporter: vinoyang
Assignee: vinoyang


Currently, keyed streams are widely used to perform aggregating operations 
(e.g., reduce, sum and window) on the elements that have the same key. When 
executed at runtime, the elements with the same key will be sent to and 
aggregated by the same task.
 
The performance of these aggregating operations is very sensitive to the 
distribution of keys. In the cases where the distribution of keys follows a 
powerful law, the performance will be significantly downgraded. More unluckily, 
increasing the degree of parallelism does not help when a task is overloaded by 
a single key.
 
Local aggregation is a widely-adopted method to reduce the performance degraded 
by data skew. We can decompose the aggregating operations into two phases. In 
the first phase, we aggregate the elements of the same key at the sender side 
to obtain partial results. Then at the second phase, these partial results are 
sent to receivers according to their keys and are combined to obtain the final 
result. Since the number of partial results received by each receiver is 
limited by the number of senders, the imbalance among receivers can be reduced. 
Besides, by reducing the amount of transferred data the performance can be 
further improved.

The design documentation is here: 
[https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing]

The discussion thread is here: 
[http://mail-archives.apache.org/mod_mbox/flink-dev/201906.mbox/%3CCAA_=o7dvtv8zjcxknxyoyy7y_ktvgexrvb4zhxjwzuhsulz...@mail.gmail.com%3E]

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory

2019-06-09 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859657#comment-16859657
 ] 

Congxian Qiu(klion26) commented on FLINK-12785:
---

Thanks for filing the issue, I will have a look at it.

> RocksDB savepoint recovery can use a lot of unmanaged memory
> 
>
> Key: FLINK-12785
> URL: https://issues.apache.org/jira/browse/FLINK-12785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Mike Kaplinskiy
>Priority: Major
>
> I'm running an application that's backfilling data from Kafka. There's 
> approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
> partitions were created over time) and I'm using daily windows. This makes a 
> lot of the windows buffer their contents before the watermark catches up to 
> "release" them. In turn, this gives me a lot of in-flight windows (200-300) 
> with very large state keys in rocksdb (on the order of 40-50mb per key).
> Running the pipeline tends to be mostly fine - it's not terribly fast when 
> appends happen but everything works. The problem comes when doing a savepoint 
> restore - specifically, the taskmanagers eat ram until the kernel kills it 
> due to being out of memory. The extra memory isn't JVM heap since the memory 
> usage of the process is ~4x the -Xmx value and there aren't any 
> {{OutOfMemoryError}} exceptions.
> I traced the culprit of the memory growth to 
> [RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
>  . Specifically, while the keys/values are deserialized on the Java heap, 
> {{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
> buffers in unmanaged memory. That's not in itself an issue, but 
> {{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not 
> a number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
> flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
> of unmanaged memory before a flush.
> My suggestion would be to add an additional flush criteria to 
> {{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
> records or 5mb buffered). This way large key writes would be immediately 
> flushed to RocksDB on recovery or even writes. I applied this approach and I 
> was able to complete a savepoint restore for my job. That said, I'm not 
> entirely sure what else this change would impact since I'm not very familiar 
> with Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory

2019-06-09 Thread Mike Kaplinskiy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Kaplinskiy updated FLINK-12785:

Description: 
I'm running an application that's backfilling data from Kafka. There's 
approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
partitions were created over time) and I'm using daily windows. This makes a 
lot of the windows buffer their contents before the watermark catches up to 
"release" them. In turn, this gives me a lot of in-flight windows (200-300) 
with very large state keys in rocksdb (on the order of 40-50mb per key).

Running the pipeline tends to be mostly fine - it's not terribly fast when 
appends happen but everything works. The problem comes when doing a savepoint 
restore - specifically, the taskmanagers eat ram until the kernel kills it due 
to being out of memory. The extra memory isn't JVM heap since the memory usage 
of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} 
exceptions.

I traced the culprit of the memory growth to 
[RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
 . Specifically, while the keys/values are deserialized on the Java heap, 
{{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
buffers in unmanaged memory. That's not in itself an issue, but 
{{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a 
number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
of unmanaged memory before a flush.

My suggestion would be to add an additional flush criteria to 
{{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
records or 5mb buffered). This way large key writes would be immediately 
flushed to RocksDB on recovery or even writes. I applied this approach and I 
was able to complete a savepoint restore for my job. That said, I'm not 
entirely sure what else this change would impact since I'm not very familiar 
with Flink.

  was:
I'm running an application that's backfilling data from Kafka. There's 
approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
partitions were created over time) and I'm using daily windows. This makes a 
lot of the windows buffer their contents before the watermark catches up to 
"release" them. In turn, this gives me a lot of in-flight windows (200-300) 
with very large state keys in rocksdb (on the order of 40-50mb per key).

Running the pipeline tends to be mostly fine - it's not terribly fast when 
appends happen but everything works. The problem comes when doing a savepoint 
restore - specifically, the taskmanagers eat ram until the kernel kills it due 
to being out of memory. The extra memory isn't JVM heap since the memory usage 
of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} 
exceptions.

I traced the culprit of the memory growth to 
[RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
 . Specifically, while the keys/values are deserialized on the Java heap, 
{{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
buffers in unmanaged memory. That's not in itself an issue, but 
{{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a 
number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
of unmanaged memory before a flush.

My suggestion would be to add an additional flush criteria to 
{{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
records or 5mb buffered). This way large key writes would be immediately 
flushed to RocksDB on recovery or even writes. I applied this approach and I 
was able to complete a savepoint restore for my jon. That said, I'm not 
entirely sure what else this change would impact since I'm not very familiar 
with Flink.


> RocksDB savepoint recovery can use a lot of unmanaged memory
> 
>
> Key: FLINK-12785
> URL: https://issues.apache.org/jira/browse/FLINK-12785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Mike Kaplinskiy
>Priority: Major
>
> I'm running an application that's backfilling data from Kafka. There's 
> approximately 3 years worth of data, 

[jira] [Updated] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory

2019-06-09 Thread Mike Kaplinskiy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Kaplinskiy updated FLINK-12785:

Description: 
I'm running an application that's backfilling data from Kafka. There's 
approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
partitions were created over time) and I'm using daily windows. This makes a 
lot of the windows buffer their contents before the watermark catches up to 
"release" them. In turn, this gives me a lot of in-flight windows (200-300) 
with very large state keys in rocksdb (on the order of 40-50mb per key).

Running the pipeline tends to be mostly fine - it's not terribly fast when 
appends happen but everything works. The problem comes when doing a savepoint 
restore - specifically, the taskmanagers eat ram until the kernel kills it due 
to being out of memory. The extra memory isn't JVM heap since the memory usage 
of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} 
exceptions.

I traced the culprit of the memory growth to 
[RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
 . Specifically, while the keys/values are deserialized on the Java heap, 
{{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
buffers in unmanaged memory. That's not in itself an issue, but 
{{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a 
number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
of unmanaged memory before a flush.

My suggestion would be to add an additional flush criteria to 
{{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
records or 5mb buffered). This way large key writes would be immediately 
flushed to RocksDB on recovery or even writes. I applied this approach and I 
was able to complete a savepoint restore for my jon. That said, I'm not 
entirely sure what else this change would impact since I'm not very familiar 
with Flink.

  was:
I'm running an application that's backfilling data from Kafka. There's 
approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
partitions were created over time) and I'm using daily windows. This makes a 
lot of the windows buffer their contents before the watermark catches up to 
"release" them. In turn, this gives me a lot of in-flight windows (200-300) 
with very large state keys in rocksdb (on the order of 40-50mb per key).

Running the pipeline tends to be mostly fine - it's not terribly fast when 
appends happen but everything works. The problem comes when doing a savepoint 
restore - specifically, the taskmanagers eat ram until the kernel kills it due 
to being out of memory. The extra memory isn't JVM heap since the memory usage 
of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} 
exceptions.

I traced the culprit of the memory growth to 
[RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
 . Specifically, while the keys/values are deserialized on the Java heap, 
{{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
buffers in unmanaged memory. That's not in itself an issue, but 
{{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a 
number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
of managed memory before a flush.

My suggestion would be to add an additional flush criteria to 
{{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
records or 5mb buffered). This way large key writes would be immediately 
flushed to RocksDB on recovery or even writes. I applied this approach and I 
was able to complete a savepoint restore for my jon. That said, I'm not 
entirely sure what else this change would impact since I'm not very familiar 
with Flink.


> RocksDB savepoint recovery can use a lot of unmanaged memory
> 
>
> Key: FLINK-12785
> URL: https://issues.apache.org/jira/browse/FLINK-12785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Mike Kaplinskiy
>Priority: Major
>
> I'm running an application that's backfilling data from Kafka. There's 
> approximately 3 years worth of data, 

[jira] [Updated] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory

2019-06-09 Thread Mike Kaplinskiy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Kaplinskiy updated FLINK-12785:

Description: 
I'm running an application that's backfilling data from Kafka. There's 
approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
partitions were created over time) and I'm using daily windows. This makes a 
lot of the windows buffer their contents before the watermark catches up to 
"release" them. In turn, this gives me a lot of in-flight windows (200-300) 
with very large state keys in rocksdb (on the order of 40-50mb per key).

Running the pipeline tends to be mostly fine - it's not terribly fast when 
appends happen but everything works. The problem comes when doing a savepoint 
restore - specifically, the taskmanagers eat ram until the kernel kills it due 
to being out of memory. The extra memory isn't JVM heap since the memory usage 
of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} 
exceptions.

I traced the culprit of the memory growth to 
[RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
 . Specifically, while the keys/values are deserialized on the Java heap, 
{{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
buffers in unmanaged memory. That's not in itself an issue, but 
{{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a 
number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
of managed memory before a flush.

My suggestion would be to add an additional flush criteria to 
{{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
records or 5mb buffered). This way large key writes would be immediately 
flushed to RocksDB on recovery or even writes. I applied this approach and I 
was able to complete a savepoint restore for my jon. That said, I'm not 
entirely sure what else this change would impact since I'm not very familiar 
with Flink.

  was:
I'm running an application that's backfilling data from Kafka. There's 
approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
partitions were created over time) and I'm using daily windows. This makes a 
lot of the windows buffer their contents before the watermark catches up to 
"release" them. In turn, this gives me a lot of in-flight windows (200-300) 
with very large state keys in rocksdb (on the order of 40-50mb per key).

Running the pipeline tends to be mostly fine - it's not terribly fast when 
appends happen but everything works. The problem comes when doing a savepoint 
restore - specifically, the taskmanagers eat ram until the kernel kills it due 
to being out of memory. The extra memory isn't JVM heap since the memory usage 
of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} 
exceptions.

I traced the culprit of the memory growth to 
[RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
 . Specifically, while the keys/values are deserialized on the Java heap, 
{{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
buffers in  unmanaged memory. That's not in itself an issue, but 
{{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a 
number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
of managed memory before a flush.

My suggestion would be to add an additional flush criteria to 
{{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
records or 5mb buffered). This way large key writes would be immediately 
flushed to RocksDB on recovery or even writes. I applied this approach and I 
was able to complete a savepoint restore for my jon. That said, I'm not 
entirely sure what else this change would impact since I'm not very familiar 
with Flink.


> RocksDB savepoint recovery can use a lot of unmanaged memory
> 
>
> Key: FLINK-12785
> URL: https://issues.apache.org/jira/browse/FLINK-12785
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Mike Kaplinskiy
>Priority: Major
>
> I'm running an application that's backfilling data from Kafka. There's 
> approximately 3 years worth of data, with 

[GitHub] [flink] becketqin commented on issue #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk…

2019-06-09 Thread GitBox
becketqin commented on issue #8653: [FLINK-10455][Connectors/Kafka] Ensure all 
the KafkaProducers are recycled when FlinkKafk…
URL: https://github.com/apache/flink/pull/8653#issuecomment-500265816
 
 
   @pnowojski Will you have time to take a look? It is a small patch and 
hopefully won't take too much time. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12785) RocksDB savepoint recovery can use a lot of unmanaged memory

2019-06-09 Thread Mike Kaplinskiy (JIRA)
Mike Kaplinskiy created FLINK-12785:
---

 Summary: RocksDB savepoint recovery can use a lot of unmanaged 
memory
 Key: FLINK-12785
 URL: https://issues.apache.org/jira/browse/FLINK-12785
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Mike Kaplinskiy


I'm running an application that's backfilling data from Kafka. There's 
approximately 3 years worth of data, with a lot of watermark skew (i.e. new 
partitions were created over time) and I'm using daily windows. This makes a 
lot of the windows buffer their contents before the watermark catches up to 
"release" them. In turn, this gives me a lot of in-flight windows (200-300) 
with very large state keys in rocksdb (on the order of 40-50mb per key).

Running the pipeline tends to be mostly fine - it's not terribly fast when 
appends happen but everything works. The problem comes when doing a savepoint 
restore - specifically, the taskmanagers eat ram until the kernel kills it due 
to being out of memory. The extra memory isn't JVM heap since the memory usage 
of the process is ~4x the -Xmx value and there aren't any {{OutOfMemoryError}} 
exceptions.

I traced the culprit of the memory growth to 
[RocksDBFullRestoreOperation.java#L212|https://github.com/apache/flink/blob/68910fa5381c8804ddbde3087a2481911ebd6d85/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java#L212]
 . Specifically, while the keys/values are deserialized on the Java heap, 
{{RocksDBWriteBatchWrapper}} forwards it to RocksDB's {{WriteBatch}} which 
buffers in  unmanaged memory. That's not in itself an issue, but 
{{RocksDBWriteBatchWrapper}} flushes only based on a number of records - not a 
number of bytes in-flight. Specifically, {{RocksDBWriteBatchWrapper}} will 
flush only once it has 500 records, and at 40mb per key, that's at least 20Gb 
of managed memory before a flush.

My suggestion would be to add an additional flush criteria to 
{{RocksDBWriteBatchWrapper}} - one based on {{batch.getDataSize()}} (e.g. 500 
records or 5mb buffered). This way large key writes would be immediately 
flushed to RocksDB on recovery or even writes. I applied this approach and I 
was able to complete a savepoint restore for my jon. That said, I'm not 
entirely sure what else this change would impact since I'm not very familiar 
with Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12776) Ambiguous content in flink-dist NOTICE file

2019-06-09 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859648#comment-16859648
 ] 

sunjincheng commented on FLINK-12776:
-

We can continue to discuss it in the JIRA.

> Ambiguous content in flink-dist NOTICE file
> ---
>
> Key: FLINK-12776
> URL: https://issues.apache.org/jira/browse/FLINK-12776
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Release System
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.9.0
>
> Attachments: image-2019-06-10-09-39-06-637.png
>
>
> With FLINK-12409 we include the new flink-python module in flink-dist. As a 
> result we now have 2 {{flink-python}} entries in the flink-dist NOTICE file, 
> one for the old batch API and one for the newly added one, which is 
> ambiguous. We should rectify this by either excluding the old batch API from 
> flink-dist, or rename the new module to something like {{flink-api-python}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12776) Ambiguous content in flink-dist NOTICE file

2019-06-09 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859647#comment-16859647
 ] 

sunjincheng commented on FLINK-12776:
-

Copy the discuss history here: 

!image-2019-06-10-09-39-06-637.png!

> Ambiguous content in flink-dist NOTICE file
> ---
>
> Key: FLINK-12776
> URL: https://issues.apache.org/jira/browse/FLINK-12776
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Release System
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.9.0
>
> Attachments: image-2019-06-10-09-39-06-637.png
>
>
> With FLINK-12409 we include the new flink-python module in flink-dist. As a 
> result we now have 2 {{flink-python}} entries in the flink-dist NOTICE file, 
> one for the old batch API and one for the newly added one, which is 
> ambiguous. We should rectify this by either excluding the old batch API from 
> flink-dist, or rename the new module to something like {{flink-api-python}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12776) Ambiguous content in flink-dist NOTICE file

2019-06-09 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12776:

Attachment: image-2019-06-10-09-39-06-637.png

> Ambiguous content in flink-dist NOTICE file
> ---
>
> Key: FLINK-12776
> URL: https://issues.apache.org/jira/browse/FLINK-12776
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Release System
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.9.0
>
> Attachments: image-2019-06-10-09-39-06-637.png
>
>
> With FLINK-12409 we include the new flink-python module in flink-dist. As a 
> result we now have 2 {{flink-python}} entries in the flink-dist NOTICE file, 
> one for the old batch API and one for the newly added one, which is 
> ambiguous. We should rectify this by either excluding the old batch API from 
> flink-dist, or rename the new module to something like {{flink-api-python}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] becketqin commented on a change in pull request #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk…

2019-06-09 Thread GitBox
becketqin commented on a change in pull request #8653: 
[FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when 
FlinkKafk…
URL: https://github.com/apache/flink/pull/8653#discussion_r291856564
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
 ##
 @@ -16,15 +16,14 @@
 # limitations under the License.
 

 
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
 
 Review comment:
   In general, unit test should run silently without log4j logs by default. 
Otherwise it will pollute the screen and slow down the tests. Failures should 
be reported via the testing framework such as JUnit/ TestNG. If detail logs are 
needed for debugging, one should manually turn on the log4j logs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8623: [FLINK-12719][python] Add the Python catalog API

2019-06-09 Thread GitBox
bowenli86 commented on a change in pull request #8623: [FLINK-12719][python] 
Add the Python catalog API
URL: https://github.com/apache/flink/pull/8623#discussion_r291852777
 
 

 ##
 File path: flink-python/pyflink/table/catalog.py
 ##
 @@ -0,0 +1,909 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from py4j.java_gateway import java_import
+
+from pyflink.java_gateway import get_gateway
+from pyflink.table.table_schema import TableSchema
+
+__all__ = ['Catalog', 'CatalogDatabase', 'CatalogBaseTable', 
'CatalogPartition', 'CatalogFunction',
+   'ObjectPath', 'CatalogPartitionSpec', 'CatalogTableStatistics',
+   'CatalogColumnStatistics', 'HiveCatalog', 'HiveCatalogDatabase', 
'HiveCatalogFunction',
+   'HiveCatalogPartition', 'HiveCatalogTable', 'HiveCatalogView']
+
+
+class Catalog(object):
+"""
+Catalog is responsible for reading and writing metadata such as 
database/table/views/UDFs
+from a registered catalog. It connects a registered catalog and Flink's 
Table API.
+"""
+
+def __init__(self, j_catalog):
+self._j_catalog = j_catalog
+
+def get_default_database(self):
+"""
+Get the name of the default database for this catalog. The default 
database will be the
+current database for the catalog when user's session doesn't specify a 
current database.
+The value probably comes from configuration, will not change for the 
life time of the
+catalog instance.
+
+:return: The name of the current database.
+"""
+return self._j_catalog.getDefaultDatabase()
+
+def list_databases(self):
+"""
+Get the names of all databases in this catalog.
+
+:return: A list of the names of all databases.
+"""
+return list(self._j_catalog.listDatabases())
+
+def get_database(self, database_name):
+"""
+Get a database from this catalog.
+
+:param database_name: Name of the database.
+:return: The requested database.
+"""
+return CatalogDatabase(self._j_catalog.getDatabase(database_name))
+
+def database_exists(self, database_name):
+"""
+Check if a database exists in this catalog.
+
+:param database_name: Name of the database.
+:return: true if the given database exists in the catalog false 
otherwise.
+"""
+return self._j_catalog.databaseExists(database_name)
+
+def create_database(self, name, database, ignore_if_exists):
+"""
+Create a database.
+
+:param name: Name of the database to be created.
+:param database: The database definition.
+:param ignore_if_exists: Flag to specify behavior when a database with 
the given name
+ already exists:
+ if set to false, throw a 
DatabaseAlreadyExistException,
+ if set to true, do nothing.
+"""
+self._j_catalog.createDatabase(name, database._j_catalog_database, 
ignore_if_exists)
+
+def drop_database(self, name, ignore_if_exists):
+"""
+Drop a database.
+
+:param name: Name of the database to be dropped.
+:param ignore_if_exists: Flag to specify behavior when the database 
does not exist:
+ if set to false, throw an exception,
+ if set to true, do nothing.
+"""
+self._j_catalog.dropDatabase(name, ignore_if_exists)
+
+def alter_database(self, name, new_database, ignore_if_not_exists):
+"""
+Modify an existing database.
+
+:param name: Name of the database to be modified.
+:param new_database: The new database definition.
+:param ignore_if_not_exists: Flag to specify behavior when the given 
database does not
+ exist:
+ if set to false, throw an exception,
+ if set to true, do 

[jira] [Commented] (FLINK-12620) Deadlock in task deserialization

2019-06-09 Thread Mike Kaplinskiy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859581#comment-16859581
 ] 

Mike Kaplinskiy commented on FLINK-12620:
-

Unfortunately my use-case is a bit strange - I'm running Flink via Beam - and 
via a Clojure API on top of Beam at that.

Attached [^BatchJob.java] is my attempt to recreate the issue - via Flink's 
Java API. I triggered it by making a diamond - but I'm pretty sure you can 
trigger this issue easier via the stream api - where there is always more than 
1 subtask running. Also attached is the jstack output [^jstack_repro.txt] from 
trying to run the job. Let me know if you need anything else.

> Deadlock in task deserialization
> 
>
> Key: FLINK-12620
> URL: https://issues.apache.org/jira/browse/FLINK-12620
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.8.0
>Reporter: Mike Kaplinskiy
>Priority: Major
> Attachments: BatchJob.java, jstack_repro.txt, jstack_snippet.txt
>
>
> When running a batch job, I ran into an issue where task deserialization 
> caused a deadlock. Specifically, if you have a static initialization 
> dependency graph that looks like this (these are all classes):
> {code:java}
> Task1 depends on A
> A depends on B
> B depends on C
> C depends on B [cycle]
> Task2 depends on C{code}
> What seems to happen is a deadlock. Specifically, threads are started on the 
> task managers that simultaneously call BatchTask.instantiateUserCode on both 
> Task1 and Task2. This starts deserializing the classes and initializing them. 
> Here's the deadlock scenario, as a stack:
> {code:java}
> Time>
> T1: [deserialize] -> Task1 -> A -> B -> (wait for 
> C)
> T2: [deserialize] -> Task2              -> C -> (wait for 
> B){code}
>  
> A similar scenario from the web: 
> [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] .
>  
> For my specific problem, I'm running into this within Clojure - 
> {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with 
> {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. 
> Deserializing different clojure functions calls one or the other first which 
> deadlocks task managers.
>  
> I built a version of flink-core that had 
> {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} 
> synchronized, but I'm not sure that it's the proper fix. I'm happy to submit 
> that as a patch, but I'm not familiar enough with the codebase to say that 
> it's the correct solution - ideally all Java class loading is synchronized, 
> but I'm not sure how to do that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12620) Deadlock in task deserialization

2019-06-09 Thread Mike Kaplinskiy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Kaplinskiy updated FLINK-12620:

Attachment: jstack_repro.txt

> Deadlock in task deserialization
> 
>
> Key: FLINK-12620
> URL: https://issues.apache.org/jira/browse/FLINK-12620
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.8.0
>Reporter: Mike Kaplinskiy
>Priority: Major
> Attachments: BatchJob.java, jstack_repro.txt, jstack_snippet.txt
>
>
> When running a batch job, I ran into an issue where task deserialization 
> caused a deadlock. Specifically, if you have a static initialization 
> dependency graph that looks like this (these are all classes):
> {code:java}
> Task1 depends on A
> A depends on B
> B depends on C
> C depends on B [cycle]
> Task2 depends on C{code}
> What seems to happen is a deadlock. Specifically, threads are started on the 
> task managers that simultaneously call BatchTask.instantiateUserCode on both 
> Task1 and Task2. This starts deserializing the classes and initializing them. 
> Here's the deadlock scenario, as a stack:
> {code:java}
> Time>
> T1: [deserialize] -> Task1 -> A -> B -> (wait for 
> C)
> T2: [deserialize] -> Task2              -> C -> (wait for 
> B){code}
>  
> A similar scenario from the web: 
> [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] .
>  
> For my specific problem, I'm running into this within Clojure - 
> {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with 
> {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. 
> Deserializing different clojure functions calls one or the other first which 
> deadlocks task managers.
>  
> I built a version of flink-core that had 
> {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} 
> synchronized, but I'm not sure that it's the proper fix. I'm happy to submit 
> that as a patch, but I'm not familiar enough with the codebase to say that 
> it's the correct solution - ideally all Java class loading is synchronized, 
> but I'm not sure how to do that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12620) Deadlock in task deserialization

2019-06-09 Thread Mike Kaplinskiy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Kaplinskiy updated FLINK-12620:

Attachment: BatchJob.java

> Deadlock in task deserialization
> 
>
> Key: FLINK-12620
> URL: https://issues.apache.org/jira/browse/FLINK-12620
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.8.0
>Reporter: Mike Kaplinskiy
>Priority: Major
> Attachments: BatchJob.java, jstack_snippet.txt
>
>
> When running a batch job, I ran into an issue where task deserialization 
> caused a deadlock. Specifically, if you have a static initialization 
> dependency graph that looks like this (these are all classes):
> {code:java}
> Task1 depends on A
> A depends on B
> B depends on C
> C depends on B [cycle]
> Task2 depends on C{code}
> What seems to happen is a deadlock. Specifically, threads are started on the 
> task managers that simultaneously call BatchTask.instantiateUserCode on both 
> Task1 and Task2. This starts deserializing the classes and initializing them. 
> Here's the deadlock scenario, as a stack:
> {code:java}
> Time>
> T1: [deserialize] -> Task1 -> A -> B -> (wait for 
> C)
> T2: [deserialize] -> Task2              -> C -> (wait for 
> B){code}
>  
> A similar scenario from the web: 
> [https://www.farside.org.uk/201510/deadlocks_in_java_class_initialisation] .
>  
> For my specific problem, I'm running into this within Clojure - 
> {{clojure.lang.RT}} has a dep on {{clojure.lang.Util}} which has a dep with 
> {{clojure.lang.Numbers}} which depends on {{clojure.lang.RT}} again. 
> Deserializing different clojure functions calls one or the other first which 
> deadlocks task managers.
>  
> I built a version of flink-core that had 
> {{org.apache.flink.util.InstantiationUtil.readObjectFromConfig}} 
> synchronized, but I'm not sure that it's the proper fix. I'm happy to submit 
> that as a patch, but I'm not familiar enough with the codebase to say that 
> it's the correct solution - ideally all Java class loading is synchronized, 
> but I'm not sure how to do that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8561: [FLINK-12588][python] Add TableSchema for Python Table API.

2019-06-09 Thread GitBox
asfgit closed pull request #8561: [FLINK-12588][python] Add TableSchema for 
Python Table API.
URL: https://github.com/apache/flink/pull/8561
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12588) Add TableSchema for Python Table API

2019-06-09 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12588.
---
Resolution: Fixed

Fixed in master: 8eaa2d038e9672c3a1364e1b6699b2305b8a04cb

> Add TableSchema for Python Table API
> 
>
> Key: FLINK-12588
> URL: https://issues.apache.org/jira/browse/FLINK-12588
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Wei Zhong
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As discussed in the PR of FLINK-12439, we need to add TableSchema to Python 
> Table API after FLINK-12408 is committed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12541) Add deploy a Python Flink job and session cluster on Kubernetes support.

2019-06-09 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859554#comment-16859554
 ] 

sunjincheng commented on FLINK-12541:
-

Thanks for the remainder [~dian.fu]

I think one JIRA only opens one PR is good to Flink.  both create a new Jira 
for part2 or put the part2 change with multiple commits in one PR are makes 
sense to me. So feel free to do it which you like :)

> Add deploy a Python Flink job and session cluster on Kubernetes support.
> 
>
> Key: FLINK-12541
> URL: https://issues.apache.org/jira/browse/FLINK-12541
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Runtime / REST
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add deploy a Python Flink job and session cluster on Kubernetes support.
> We need to have the same deployment step as the Java job. Please see: 
> [https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on issue #8623: [FLINK-12719][python] Add the Python catalog API

2019-06-09 Thread GitBox
sunjincheng121 commented on issue #8623: [FLINK-12719][python] Add the Python 
catalog API
URL: https://github.com/apache/flink/pull/8623#issuecomment-500240970
 
 
   @flinkbot approve consensus


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8561: [FLINK-12588][python] Add TableSchema for Python Table API.

2019-06-09 Thread GitBox
sunjincheng121 commented on issue #8561: [FLINK-12588][python] Add TableSchema 
for Python Table API.
URL: https://github.com/apache/flink/pull/8561#issuecomment-500240339
 
 
   +1 to merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk…

2019-06-09 Thread GitBox
sunjincheng121 commented on issue #8653: [FLINK-10455][Connectors/Kafka] Ensure 
all the KafkaProducers are recycled when FlinkKafk…
URL: https://github.com/apache/flink/pull/8653#issuecomment-500239955
 
 
   @flinkbot approve description 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk…

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8653: 
[FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when 
FlinkKafk…
URL: https://github.com/apache/flink/pull/8653#discussion_r291848024
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ##
 @@ -656,33 +656,40 @@ public void invoke(KafkaTransactionState transaction, IN 
next, Context context)
 
@Override
public void close() throws FlinkKafka011Exception {
-   final KafkaTransactionState currentTransaction = 
currentTransaction();
-   if (currentTransaction != null) {
-   // to avoid exceptions on aborting transactions with 
some pending records
-   flush(currentTransaction);
-
-   // normal abort for AT_LEAST_ONCE and NONE do not clean 
up resources because of producer reusing, thus
-   // we need to close it manually
-   switch (semantic) {
-   case EXACTLY_ONCE:
-   break;
-   case AT_LEAST_ONCE:
-   case NONE:
-   currentTransaction.producer.close();
-   break;
-   }
-   }
+   // First close the producer for current transaction.
try {
+   final KafkaTransactionState currentTransaction = 
currentTransaction();
+   if (currentTransaction != null) {
+   // to avoid exceptions on aborting transactions 
with some pending records
+   flush(currentTransaction);
+
+   // normal abort for AT_LEAST_ONCE and NONE do 
not clean up resources because of producer reusing, thus
+   // we need to close it manually
+   switch (semantic) {
+   case EXACTLY_ONCE:
+   break;
+   case AT_LEAST_ONCE:
+   case NONE:
+   
currentTransaction.producer.close();
+   break;
+   }
+   }
super.close();
-   }
-   catch (Exception e) {
+   } catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
+   } finally {
+   // We may have to close producer of the current 
transaction in case some exception was thrown before
+   // the normal close routine finishes.
+   if (currentTransaction() != null) {
+   
IOUtils.closeQuietly(currentTransaction().producer);
+   }
+   // Make sure all the producers for pending transactions 
are closed.
+   pendingTransactions().forEach(transaction ->
+   
  IOUtils.closeQuietly(transaction.getValue().producer)
 
 Review comment:
   Code format error, detail can be found 
[here](https://api.travis-ci.org/v3/job/542303676/log.txt):
   FlinkKafkaProducer011.java:[688] (regexp) RegexpSinglelineJava: Line has 
leading space characters; indentation should be performed with tabs only.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8653: [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when FlinkKafk…

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8653: 
[FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers are recycled when 
FlinkKafk…
URL: https://github.com/apache/flink/pull/8653#discussion_r291847967
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
 ##
 @@ -16,15 +16,14 @@
 # limitations under the License.
 

 
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
 
 Review comment:
   Why we should change this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8225: [FLINK-10984]Remove flink-shaded-hadoop from flink

2019-06-09 Thread GitBox
sunjincheng121 commented on issue #8225: [FLINK-10984]Remove 
flink-shaded-hadoop from flink
URL: https://github.com/apache/flink/pull/8225#issuecomment-500239090
 
 
   Thanks for the Review @zentol 
   I have rebased the code and wait for CI change green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8225: [FLINK-10984]Remove flink-shaded-hadoop from flink

2019-06-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8225: [FLINK-10984]Remove 
flink-shaded-hadoop from flink
URL: https://github.com/apache/flink/pull/8225#discussion_r291847332
 
 

 ##
 File path: flink-shaded-yarn-tests/pom.xml
 ##
 @@ -146,42 +172,42 @@ under the License.



com.google
-   
org.apache.flink.hadoop.shaded.com.google
+   
org.apache.flink.hadoop2.shaded.com.google
 
 Review comment:
   Without this chanage, the test of `YARNHighAvailabilityITCase` cannot get 
success.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …

2019-06-09 Thread GitBox
flinkbot commented on issue #8668: [FLINK-12784][metrics] Support retention 
policy for InfluxDB metrics …
URL: https://github.com/apache/flink/pull/8668#issuecomment-500236796
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >