[jira] [Commented] (FLINK-12297) Make ClosureCleaner recursive

2019-06-14 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864624#comment-16864624
 ] 

Aljoscha Krettek commented on FLINK-12297:
--

Merge on release-1.8 in 159c52769045853bc95a0efaae8ab2c7675e1d42

> Make ClosureCleaner recursive
> -
>
> Key: FLINK-12297
> URL: https://issues.apache.org/jira/browse/FLINK-12297
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet, API / DataStream
>Affects Versions: 1.8.0
>Reporter: Dawid Wysakowicz
>Assignee: Aitozi
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Right now we do not invoke closure cleaner on output tags. Therefore such 
> code:
> {code}
>   @Test
>   public void testFlatSelectSerialization() throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   DataStreamSource elements = env.fromElements(1, 2, 3);
>   OutputTag outputTag = new OutputTag("AAA") {};
>   CEP.pattern(elements, Pattern.begin("A")).flatSelect(
>   outputTag,
>   new PatternFlatTimeoutFunction() {
>   @Override
>   public void timeout(
>   Map> pattern,
>   long timeoutTimestamp,
>   Collector out) throws 
> Exception {
>   }
>   },
>   new PatternFlatSelectFunction() {
>   @Override
>   public void flatSelect(Map List> pattern, Collector out) throws Exception {
>   }
>   }
>   );
>   env.execute();
>   }
> {code}
> will fail with {{The implementation of the PatternFlatSelectAdapter is not 
> serializable. }} exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12297) Make ClosureCleaner recursive

2019-06-14 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-12297.

Resolution: Fixed

> Make ClosureCleaner recursive
> -
>
> Key: FLINK-12297
> URL: https://issues.apache.org/jira/browse/FLINK-12297
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet, API / DataStream
>Affects Versions: 1.8.0
>Reporter: Dawid Wysakowicz
>Assignee: Aitozi
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Right now we do not invoke closure cleaner on output tags. Therefore such 
> code:
> {code}
>   @Test
>   public void testFlatSelectSerialization() throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   DataStreamSource elements = env.fromElements(1, 2, 3);
>   OutputTag outputTag = new OutputTag("AAA") {};
>   CEP.pattern(elements, Pattern.begin("A")).flatSelect(
>   outputTag,
>   new PatternFlatTimeoutFunction() {
>   @Override
>   public void timeout(
>   Map> pattern,
>   long timeoutTimestamp,
>   Collector out) throws 
> Exception {
>   }
>   },
>   new PatternFlatSelectFunction() {
>   @Override
>   public void flatSelect(Map List> pattern, Collector out) throws Exception {
>   }
>   }
>   );
>   env.execute();
>   }
> {code}
> will fail with {{The implementation of the PatternFlatSelectAdapter is not 
> serializable. }} exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] aljoscha closed pull request #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-06-14 Thread GitBox
aljoscha closed pull request #8280: [FLINK-12297]Harden ClosureCleaner to 
handle the wrapped function
URL: https://github.com/apache/flink/pull/8280
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-06-14 Thread GitBox
aljoscha commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle 
the wrapped function
URL: https://github.com/apache/flink/pull/8280#issuecomment-502340886
 
 
   I merged this! Thanks again to the two of you for working on this. 👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12297) Make ClosureCleaner recursive

2019-06-14 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864617#comment-16864617
 ] 

Aljoscha Krettek commented on FLINK-12297:
--

Merged on master in 68cc21e4af71505efa142110e35a1f8b1c25fe6e

> Make ClosureCleaner recursive
> -
>
> Key: FLINK-12297
> URL: https://issues.apache.org/jira/browse/FLINK-12297
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet, API / DataStream
>Affects Versions: 1.8.0
>Reporter: Dawid Wysakowicz
>Assignee: Aitozi
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Right now we do not invoke closure cleaner on output tags. Therefore such 
> code:
> {code}
>   @Test
>   public void testFlatSelectSerialization() throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   DataStreamSource elements = env.fromElements(1, 2, 3);
>   OutputTag outputTag = new OutputTag("AAA") {};
>   CEP.pattern(elements, Pattern.begin("A")).flatSelect(
>   outputTag,
>   new PatternFlatTimeoutFunction() {
>   @Override
>   public void timeout(
>   Map> pattern,
>   long timeoutTimestamp,
>   Collector out) throws 
> Exception {
>   }
>   },
>   new PatternFlatSelectFunction() {
>   @Override
>   public void flatSelect(Map List> pattern, Collector out) throws Exception {
>   }
>   }
>   );
>   env.execute();
>   }
> {code}
> will fail with {{The implementation of the PatternFlatSelectAdapter is not 
> serializable. }} exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] aljoscha closed pull request #8744: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-06-14 Thread GitBox
aljoscha closed pull request #8744: [FLINK-12297]Harden ClosureCleaner to 
handle the wrapped function
URL: https://github.com/apache/flink/pull/8744
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12855) Stagger panes on partitions to distribute workload.

2019-06-14 Thread Teng Hu (JIRA)
Teng Hu created FLINK-12855:
---

 Summary: Stagger panes on partitions to distribute workload.
 Key: FLINK-12855
 URL: https://issues.apache.org/jira/browse/FLINK-12855
 Project: Flink
  Issue Type: New Feature
  Components: API / DataStream
Reporter: Teng Hu
 Attachments: stagger_window.png, stagger_window_delay.png, 
stagger_window_throughput.png

Flink natively triggers all panes belonging to same window at the same time. In 
other words, all panes are aligned and their triggers all fire simultaneously, 
causing the thundering herd effect.

This new feature provides the option that panes could be staggered across 
partitioned streams, so that their workloads are distributed.

Attachment: proof of concept working






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12855) Stagger panes on partitions to distribute workload.

2019-06-14 Thread Teng Hu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Teng Hu updated FLINK-12855:

Attachment: (was: stagger_window_delay.png)

> Stagger panes on partitions to distribute workload.
> ---
>
> Key: FLINK-12855
> URL: https://issues.apache.org/jira/browse/FLINK-12855
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Teng Hu
>Priority: Major
> Attachments: stagger_window.png, stagger_window_delay.png, 
> stagger_window_throughput.png
>
>
> Flink natively triggers all panes belonging to same window at the same time. 
> In other words, all panes are aligned and their triggers all fire 
> simultaneously, causing the thundering herd effect.
> This new feature provides the option that panes could be staggered across 
> partitioned streams, so that their workloads are distributed.
> Attachment: proof of concept working



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12855) Stagger panes on partitions to distribute workload.

2019-06-14 Thread Teng Hu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Teng Hu updated FLINK-12855:

Attachment: stagger_window.png

> Stagger panes on partitions to distribute workload.
> ---
>
> Key: FLINK-12855
> URL: https://issues.apache.org/jira/browse/FLINK-12855
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Teng Hu
>Priority: Major
> Attachments: stagger_window.png, stagger_window_delay.png, 
> stagger_window_throughput.png
>
>
> Flink natively triggers all panes belonging to same window at the same time. 
> In other words, all panes are aligned and their triggers all fire 
> simultaneously, causing the thundering herd effect.
> This new feature provides the option that panes could be staggered across 
> partitioned streams, so that their workloads are distributed.
> Attachment: proof of concept working



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] mans2singh commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …

2019-06-14 Thread GitBox
mans2singh commented on a change in pull request #8668: [FLINK-12784][metrics] 
Support retention policy for InfluxDB metrics …
URL: https://github.com/apache/flink/pull/8668#discussion_r294030972
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java
 ##
 @@ -96,33 +96,36 @@ public void testMetricRegistration() throws Exception {
 
@Test
public void testMetricReporting() throws Exception {
-   MetricRegistryImpl metricRegistry = createMetricRegistry();
+   String retentionPolicy = "one_hour";
+   MetricRegistryImpl metricRegistry = 
createMetricRegistry(retentionPolicy);
try {
String metricName = "TestCounter";
Counter counter = registerTestMetric(metricName, 
metricRegistry);
counter.inc(42);
 
stubFor(post(urlPathEqualTo("/write"))
-   .willReturn(aResponse()
-   .withStatus(200)));
+   .willReturn(aResponse()
+   .withStatus(200)));
 
 Review comment:
   @1u0, @Myasuka - Please let me know if I can resolve this issue and if there 
is anything else required for this PR.  Thanks for your advice.
   
   Mans


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangkuiyi opened a new pull request #7015: Add the -p option to nc

2019-06-14 Thread GitBox
wangkuiyi opened a new pull request #7015: Add the -p option to nc
URL: https://github.com/apache/flink/pull/7015
 
 
   ## Purpose
   
   When I run the [quick 
start](https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html)
 tutorial in Docker containers (`docker run flink`), the 
`example/streaming/SocketWindowCount.jar` job would fail if it listens on a 
netcat process started with `nc -l 9000`. This tutorial works if I change the 
command into `nc -l -p 9000`.
   
   ## Verifying
   
   To reproduce the failure and fix:
   
   1. In a terminal, start the container `docker run --rm -it --name my_flink 
flink bash`.
   1. In the container, install and run netcat `apt-get update && apt-get 
install -y netcat && nc -l 9000`.
   1. In another terminal, attaches to the container `docker exec -it my_flink 
bash`
   1. In the attached terminal session, start the cluster `start-cluster.sh` 
and run the example `flink run example/streaming/SocketWindowWordCount.jar 
--port 9000`.
   
   The example would fail with the following errors. The failure could be 
avoided if we redo the above step but with `nc -l -p 9000`.
   
   ```
   root@493ef04c41ee:/opt/flink# flink run 
examples/streaming/SocketWindowWordCount.jar --port 9000
   Starting execution of program
   
   
The program finished with the following exception:
   
   org.apache.flink.client.program.ProgramInvocationException: Job failed. 
(JobID: b58996c431847f40852dad17f0d21244)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:290)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
   Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 17 more
   Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
   ```
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for 

[GitHub] [flink] wangkuiyi commented on issue #7015: Add the -p option to nc

2019-06-14 Thread GitBox
wangkuiyi commented on issue #7015: Add the -p option to nc
URL: https://github.com/apache/flink/pull/7015#issuecomment-502324878
 
 
   Thanks for following up @greghogan and @zentol . I am closing this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangkuiyi closed pull request #7015: Add the -p option to nc

2019-06-14 Thread GitBox
wangkuiyi closed pull request #7015: Add the -p option to nc
URL: https://github.com/apache/flink/pull/7015
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangkuiyi closed pull request #7015: Add the -p option to nc

2019-06-14 Thread GitBox
wangkuiyi closed pull request #7015: Add the -p option to nc
URL: https://github.com/apache/flink/pull/7015
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuPingyong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL

2019-06-14 Thread GitBox
XuPingyong commented on a change in pull request #8718: 
[FLINK-12824][table-planner-blink] Set parallelism for stream SQL
URL: https://github.com/apache/flink/pull/8718#discussion_r294029650
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCalc.scala
 ##
 @@ -116,11 +116,16 @@ class StreamExecCalc(
   retainHeader = true,
   "StreamExecCalc"
 )
-new OneInputTransformation(
+val ret = new OneInputTransformation(
   inputTransform,
   RelExplainUtil.calcToString(calcProgram, getExpressionString),
   substituteStreamOperator,
   BaseRowTypeInfo.of(outputType),
-  inputTransform.getParallelism)
+  getResource.getParallelism)
+
+if (getResource.getMaxParallelism > 0) {
+  ret.setMaxParallelism(getResource.getMaxParallelism)
 
 Review comment:
   It just transfers the result to StreamTransformation from parallelism 
calculating which decide whether to set max parallelism. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuPingyong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL

2019-06-14 Thread GitBox
XuPingyong commented on a change in pull request #8718: 
[FLINK-12824][table-planner-blink] Set parallelism for stream SQL
URL: https://github.com/apache/flink/pull/8718#discussion_r294029548
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/parallelism/FinalParallelismSetter.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.parallelism;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+import 
org.apache.flink.table.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import 
org.apache.flink.table.plan.nodes.physical.stream.StreamExecTableSourceScan;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Values;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Set final parallelism if needed at the beginning time, if parallelism of a 
node is set to be final,
+ * it will not be changed by other parallelism calculator.
+ */
+public class FinalParallelismSetter {
+
+   private final StreamExecutionEnvironment env;
+   private Set> calculatedNodeSet = new HashSet<>();
+   private Map, Integer> finalParallelismNodeMap = new 
HashMap<>();
+
+   private FinalParallelismSetter(StreamExecutionEnvironment env) {
+   this.env = env;
+   }
+
+   /**
+* Finding nodes that need to set final parallelism.
+*/
+   public static Map, Integer> 
calculate(StreamExecutionEnvironment env, List> sinkNodes) {
+   FinalParallelismSetter setter = new FinalParallelismSetter(env);
+   sinkNodes.forEach(setter::calculate);
+   return setter.finalParallelismNodeMap;
+   }
+
+   private void calculate(ExecNode execNode) {
+   if (!calculatedNodeSet.add(execNode)) {
+   return;
+   }
+   if (execNode instanceof BatchExecTableSourceScan) {
+   calculateTableSource((BatchExecTableSourceScan) 
execNode);
+   } else if (execNode instanceof StreamExecTableSourceScan) {
+   calculateTableSource((StreamExecTableSourceScan) 
execNode);
+   } else if (execNode instanceof BatchExecBoundedStreamScan) {
+   calculateBoundedStreamScan((BatchExecBoundedStreamScan) 
execNode);
+   } else if (execNode instanceof StreamExecDataStreamScan) {
+   calculateDataStreamScan((StreamExecDataStreamScan) 
execNode);
+   } else if (execNode instanceof Values) {
+   calculateValues(execNode);
+   } else {
+   calculateIfSingleton(execNode);
+   }
+   }
+
+   private void calculateTableSource(BatchExecTableSourceScan 
tableSourceScan) {
+   StreamTransformation transformation = 
tableSourceScan.getSourceTransformation(env);
+   if (transformation.getMaxParallelism() > 0) {
+   
tableSourceScan.getResource().setMaxParallelism(transformation.getMaxParallelism());
 
 Review comment:
   Calculating parallelism of shuffleStage later in 
ShuffleStageParallelismCalculator considers the max parallelism of exec nodes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8700: [FLINK-12657][hive] Integrate Flink with Hive UDF

2019-06-14 Thread GitBox
bowenli86 commented on issue #8700: [FLINK-12657][hive] Integrate Flink with 
Hive UDF
URL: https://github.com/apache/flink/pull/8700#issuecomment-502312751
 
 
   I created FLINK-12854 to evaluate if we should support primitive types and 
convert them for Hive functions. This is not critical and thus may or may not 
happen in 1.9.
   
   @JingsongLi Can you take another look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12854) evaluate and support for primitive array in Hive functions

2019-06-14 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12854:


 Summary: evaluate and support for primitive array in Hive functions
 Key: FLINK-12854
 URL: https://issues.apache.org/jira/browse/FLINK-12854
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Bowen Li


Flink right now support primitive arrays, and Hive functions don't. we need to 
evaluate if we should support primitive arrays by converting primitive arrays 
to list or array of boxed types, and then pass to Hive function. If it's 
positive, we should implement this feature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12705) Allow user to specify the Hive version in use

2019-06-14 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12705.

Resolution: Fixed

merged in 1.9.0: ec383d6c737fa0bc24ec6984bbc70207f8da9c53

> Allow user to specify the Hive version in use
> -
>
> Key: FLINK-12705
> URL: https://issues.apache.org/jira/browse/FLINK-12705
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Rui Li
>Assignee: Rui Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Follow up of FLINK-12649



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8694: [FLINK-12705][hive] Allow user to specify the Hive version in use

2019-06-14 Thread GitBox
asfgit closed pull request #8694: [FLINK-12705][hive] Allow user to specify the 
Hive version in use
URL: https://github.com/apache/flink/pull/8694
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8720: [FLINK-12771][hive] Support ConnectorCatalogTable in HiveCatalog

2019-06-14 Thread GitBox
bowenli86 commented on issue #8720: [FLINK-12771][hive] Support 
ConnectorCatalogTable in HiveCatalog
URL: https://github.com/apache/flink/pull/8720#issuecomment-502237063
 
 
   @xuefuz good idea to add tests for mixed operations, I'll add them. As I can 
think of for now, there should be no impact on db operations except dropping 
database needs to ensure it also doesn't have temp tables. Will add that too.
   
   I'll hold on this PR a little bit util we reach consensus with Dawid
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on issue #8745: [FLINK-11662] Disable task to fail on checkpoint errors

2019-06-14 Thread GitBox
Myasuka commented on issue #8745: [FLINK-11662] Disable task to fail on 
checkpoint errors
URL: https://github.com/apache/flink/pull/8745#issuecomment-502235022
 
 
   @flinkbot attention @StefanRRichter 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8745: [FLINK-11662] Disable task to fail on checkpoint errors

2019-06-14 Thread GitBox
flinkbot commented on issue #8745: [FLINK-11662] Disable task to fail on 
checkpoint errors
URL: https://github.com/apache/flink/pull/8745#issuecomment-502234862
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka opened a new pull request #8745: Task not fail checkpoint

2019-06-14 Thread GitBox
Myasuka opened a new pull request #8745: Task not fail checkpoint
URL: https://github.com/apache/flink/pull/8745
 
 
   ## What is the purpose of the change
   
   FLINK-11662 has been a known and urgent issue after Flink-1.8 release. There 
exist two parts to fix this problem, one is from task side to disable task to 
fail on checkpoint errors. Another part is to let JM  to decide when to fail 
the whole job which has been part of work in 
https://github.com/apache/flink/pull/8322. 
   
   This PR mainly focus on the task side and choose to base on 
https://github.com/apache/flink/pull/8322 before it merged since that is very 
unlikely to still have significant changes at this point.
   
   ## Brief change log
   
 - Deprecate `isFailTaskOnCheckpointError` and 
`setFailTaskOnCheckpointError` in `ExecutionConfig`. 
- Deprecate `isFailOnCheckpointingErrors` and 
`setFailOnCheckpointingErrors` in `CheckpointConfig`. 
 - Remove `FailingCheckpointExceptionHandler`
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Refactor `CheckpointExceptionHandlerConfigurationTest` and 
`CheckpointExceptionHandlerTest` to verify the default logic changed.
- Refactor `StreamTaskTest` to verify the default logic changed on task 
side.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **yes**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **JavaDocs**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog

2019-06-14 Thread GitBox
bowenli86 commented on a change in pull request #8636: 
[FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8636#discussion_r293931882
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -1083,8 +1136,14 @@ public void alterPartitionColumnStatistics(ObjectPath 
tablePath, CatalogPartitio
}
 
@Override
-   public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) 
throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) 
throws TableNotExistException,
+   CatalogException, TableNotPartitionedException {
+   Table hiveTable = getHiveTable(tablePath);
+   if (!isTablePartitioned(hiveTable)) {
+   return 
createCatalogTableStatistics(hiveTable.getParameters());
+   } else {
+   throw new TableNotPartitionedException(getName(), 
tablePath);
 
 Review comment:
   First, shouldn't this be a `TablePartitionedException`?
   Second, please correct me if I'm wrong, IIRC, if the table is partitioned, 
wouldn't Hive client return unknown stats?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog

2019-06-14 Thread GitBox
bowenli86 commented on a change in pull request #8636: 
[FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8636#discussion_r293934696
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
 ##
 @@ -1082,6 +1083,64 @@ public void testListPartitionPartialSpec() throws 
Exception {
assertEquals(1, catalog.listPartitions(path1, 
createAnotherPartitionSpecSubset()).size());
}
 
+
+   // -- table and column stats --
+
+   @Test
+   public void testGetTableStats_TableNotExistException() throws Exception{
+   catalog.createDatabase(db1, createDb(), false);
+   
exception.expect(org.apache.flink.table.api.TableNotExistException.class);
+   catalog.getTableStatistics(path1);
+   }
+
+   @Test
+   public void testGetPartitionStats() throws Exception{
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.createTable(path1, createPartitionedTable(), false);
+   catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+   // there're essentially no stats, so nothing to assert
+   catalog.getPartitionStatistics(path1, createPartitionSpec());
 
 Review comment:
   shouldn't we just assert every stats is the default value which is `0`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog

2019-06-14 Thread GitBox
bowenli86 commented on a change in pull request #8636: 
[FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8636#discussion_r293934522
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -1060,21 +1061,73 @@ private static Function 
instantiateHiveFunction(ObjectPath functionPath, HiveCat
);
}
 
+   private boolean isTablePartitioned(Table hiveTable) {
+   return hiveTable.getPartitionKeysSize() != 0;
+   }
+
// -- stats --
 
@Override
public void alterTableStatistics(ObjectPath tablePath, 
CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws 
TableNotExistException, CatalogException {
-
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   // Set table stats
+   if (needToUpdateStatistics(tableStatistics, 
hiveTable.getParameters())) {
+   updateStatisticsParameters(tableStatistics, 
hiveTable.getParameters());
+   client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), hiveTable);
+   }
+   } catch (TableNotExistException e) {
+   if (!ignoreIfNotExists) {
+   throw e;
+   }
+   } catch (TException e) {
+   throw new CatalogException(String.format("Failed to 
alter table stats of table %s", tablePath.getFullName()), e);
+   }
}
 
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, 
CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws 
TableNotExistException, CatalogException {
 
}
 
+   private static boolean needToUpdateStatistics(CatalogTableStatistics 
statistics, Map oldParameters) {
+   String oldRowCount = 
oldParameters.getOrDefault(StatsSetupConst.ROW_COUNT, "0");
+   String oldTotalSize = 
oldParameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, "0");
+   String oldNumFiles = 
oldParameters.getOrDefault(StatsSetupConst.NUM_FILES, "0");
+   String oldRawDataSize = 
oldParameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, "0");
+   return statistics.getRowCount() != Long.parseLong(oldRowCount) 
|| statistics.getTotalSize() != Long.parseLong(oldTotalSize)
+   || statistics.getFileCount() != 
Integer.parseInt(oldNumFiles) || statistics.getRawDataSize() != 
Long.parseLong(oldRawDataSize);
+   }
+
+   private static void updateStatisticsParameters(CatalogTableStatistics 
tableStatistics, Map parameters) {
+   parameters.put(StatsSetupConst.ROW_COUNT, 
String.valueOf(tableStatistics.getRowCount()));
+   parameters.put(StatsSetupConst.TOTAL_SIZE, 
String.valueOf(tableStatistics.getTotalSize()));
+   parameters.put(StatsSetupConst.NUM_FILES, 
String.valueOf(tableStatistics.getFileCount()));
+   parameters.put(StatsSetupConst.RAW_DATA_SIZE, 
String.valueOf(tableStatistics.getRawDataSize()));
+   }
+
+   private static CatalogTableStatistics 
createCatalogTableStatistics(Map parameters) {
+   long rowRount = 
Long.parseLong(parameters.getOrDefault(StatsSetupConst.ROW_COUNT, "0"));
 
 Review comment:
   should reuse the `0`s in `HiveStatsUtil` as mentioned above to be consistent


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog

2019-06-14 Thread GitBox
bowenli86 commented on a change in pull request #8636: 
[FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8636#discussion_r293932672
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
 ##
 @@ -1082,6 +1083,64 @@ public void testListPartitionPartialSpec() throws 
Exception {
assertEquals(1, catalog.listPartitions(path1, 
createAnotherPartitionSpecSubset()).size());
}
 
+
+   // -- table and column stats --
+
+   @Test
+   public void testGetTableStats_TableNotExistException() throws Exception{
+   catalog.createDatabase(db1, createDb(), false);
+   
exception.expect(org.apache.flink.table.api.TableNotExistException.class);
 
 Review comment:
   the `TableNotExistException` is not the class we want. We should use 
`org.apache.flink.table.catalog.exceptions.TableNotExistException`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog

2019-06-14 Thread GitBox
bowenli86 commented on a change in pull request #8636: 
[FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8636#discussion_r293934316
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -1060,21 +1061,73 @@ private static Function 
instantiateHiveFunction(ObjectPath functionPath, HiveCat
);
}
 
+   private boolean isTablePartitioned(Table hiveTable) {
+   return hiveTable.getPartitionKeysSize() != 0;
+   }
+
// -- stats --
 
@Override
public void alterTableStatistics(ObjectPath tablePath, 
CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws 
TableNotExistException, CatalogException {
-
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   // Set table stats
+   if (needToUpdateStatistics(tableStatistics, 
hiveTable.getParameters())) {
+   updateStatisticsParameters(tableStatistics, 
hiveTable.getParameters());
+   client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), hiveTable);
+   }
+   } catch (TableNotExistException e) {
+   if (!ignoreIfNotExists) {
+   throw e;
+   }
+   } catch (TException e) {
+   throw new CatalogException(String.format("Failed to 
alter table stats of table %s", tablePath.getFullName()), e);
+   }
}
 
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, 
CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws 
TableNotExistException, CatalogException {
 
}
 
+   private static boolean needToUpdateStatistics(CatalogTableStatistics 
statistics, Map oldParameters) {
+   String oldRowCount = 
oldParameters.getOrDefault(StatsSetupConst.ROW_COUNT, "0");
 
 Review comment:
   These are basically setting default values for all types of Hive table 
stats. How about moving the `0`s to `HiveStatsUtil` and define them as `public 
static final` vars to make it easier to find and access across hive catalog 
code base?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog

2019-06-14 Thread GitBox
bowenli86 commented on a change in pull request #8636: 
[FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8636#discussion_r293932104
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -1094,7 +1153,15 @@ public CatalogColumnStatistics 
getTableColumnStatistics(ObjectPath tablePath) th
 
@Override
public CatalogTableStatistics getPartitionStatistics(ObjectPath 
tablePath, CatalogPartitionSpec partitionSpec) throws 
PartitionNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   try {
+   Partition partition = getHivePartition(tablePath, 
partitionSpec);
+   return 
createCatalogTableStatistics(partition.getParameters());
+   } catch (TableNotExistException | PartitionSpecInvalidException 
e) {
+   throw new PartitionNotExistException(getName(), 
tablePath, partitionSpec, e);
+   } catch (TException e) {
+   throw new CatalogException(String.format("Failed to get 
table stats of table %s 's partition %s",
 
 Review comment:
   ```suggestion
throw new CatalogException(String.format("Failed to get 
partition stats of table %s 's partition %s",
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog

2019-06-14 Thread GitBox
bowenli86 commented on a change in pull request #8636: 
[FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8636#discussion_r293935284
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
 ##
 @@ -1082,6 +1083,64 @@ public void testListPartitionPartialSpec() throws 
Exception {
assertEquals(1, catalog.listPartitions(path1, 
createAnotherPartitionSpecSubset()).size());
}
 
+
+   // -- table and column stats --
+
+   @Test
+   public void testGetTableStats_TableNotExistException() throws Exception{
+   catalog.createDatabase(db1, createDb(), false);
+   
exception.expect(org.apache.flink.table.api.TableNotExistException.class);
+   catalog.getTableStatistics(path1);
+   }
+
+   @Test
+   public void testGetPartitionStats() throws Exception{
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.createTable(path1, createPartitionedTable(), false);
+   catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+   // there're essentially no stats, so nothing to assert
+   catalog.getPartitionStatistics(path1, createPartitionSpec());
+   }
+
+   @Test
+   public void testAlterTableStats() throws Exception{
+   // Non-partitioned table
+   catalog.createDatabase(db1, createDb(), false);
+   CatalogTable table = createTable();
+   catalog.createTable(path1, table, false);
+   CatalogTableStatistics tableStats = new 
CatalogTableStatistics(100, 10, 1000, 1);
+   catalog.alterTableStatistics(path1, tableStats, false);
+   CatalogTableStatistics actual = 
catalog.getTableStatistics(path1);
+
+   assertEquals(tableStats.toString(), actual.toString());
+   }
+
+   @Test
+   public void testAlterTableStats_partitionedTable() throws Exception {
+   // alterTableStats() should do nothing for partitioned tables
+   // getTableStats() should return empty column stats for 
partitioned tables
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.createTable(path1, createPartitionedTable(), false);
+
+   CatalogTableStatistics stats = new CatalogTableStatistics(100, 
1, 1000, 1);
+
 
 Review comment:
   > // alterTableStats() should do nothing for partitioned tables
// getTableStats() should return empty column stats for 
partitioned tables
   
   as the comment describes, we should also assert 
`assertEquals(CatalogTableStatistics.UNKNOWN, 
catalog.getTableStatistics(path1));` before the altering


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog

2019-06-14 Thread GitBox
bowenli86 commented on a change in pull request #8636: 
[FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8636#discussion_r293932284
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -1060,21 +1061,73 @@ private static Function 
instantiateHiveFunction(ObjectPath functionPath, HiveCat
);
}
 
+   private boolean isTablePartitioned(Table hiveTable) {
+   return hiveTable.getPartitionKeysSize() != 0;
+   }
+
// -- stats --
 
@Override
public void alterTableStatistics(ObjectPath tablePath, 
CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws 
TableNotExistException, CatalogException {
-
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   // Set table stats
+   if (needToUpdateStatistics(tableStatistics, 
hiveTable.getParameters())) {
+   updateStatisticsParameters(tableStatistics, 
hiveTable.getParameters());
+   client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), hiveTable);
+   }
+   } catch (TableNotExistException e) {
+   if (!ignoreIfNotExists) {
+   throw e;
+   }
+   } catch (TException e) {
+   throw new CatalogException(String.format("Failed to 
alter table stats of table %s", tablePath.getFullName()), e);
+   }
}
 
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, 
CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws 
TableNotExistException, CatalogException {
 
}
 
+   private static boolean needToUpdateStatistics(CatalogTableStatistics 
statistics, Map oldParameters) {
+   String oldRowCount = 
oldParameters.getOrDefault(StatsSetupConst.ROW_COUNT, "0");
+   String oldTotalSize = 
oldParameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, "0");
+   String oldNumFiles = 
oldParameters.getOrDefault(StatsSetupConst.NUM_FILES, "0");
+   String oldRawDataSize = 
oldParameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, "0");
+   return statistics.getRowCount() != Long.parseLong(oldRowCount) 
|| statistics.getTotalSize() != Long.parseLong(oldTotalSize)
+   || statistics.getFileCount() != 
Integer.parseInt(oldNumFiles) || statistics.getRawDataSize() != 
Long.parseLong(oldRawDataSize);
+   }
+
+   private static void updateStatisticsParameters(CatalogTableStatistics 
tableStatistics, Map parameters) {
+   parameters.put(StatsSetupConst.ROW_COUNT, 
String.valueOf(tableStatistics.getRowCount()));
+   parameters.put(StatsSetupConst.TOTAL_SIZE, 
String.valueOf(tableStatistics.getTotalSize()));
+   parameters.put(StatsSetupConst.NUM_FILES, 
String.valueOf(tableStatistics.getFileCount()));
+   parameters.put(StatsSetupConst.RAW_DATA_SIZE, 
String.valueOf(tableStatistics.getRawDataSize()));
+   }
+
+   private static CatalogTableStatistics 
createCatalogTableStatistics(Map parameters) {
+   long rowRount = 
Long.parseLong(parameters.getOrDefault(StatsSetupConst.ROW_COUNT, "0"));
+   long totalSize = 
Long.parseLong(parameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, "0"));
+   int numFiles = 
Integer.parseInt(parameters.getOrDefault(StatsSetupConst.NUM_FILES, "0"));
+   long rawDataSize = 
Long.parseLong(parameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, "0"));
+   return new CatalogTableStatistics(rowRount, numFiles, 
totalSize, rawDataSize);
+   }
+
@Override
public void alterPartitionStatistics(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, 
boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
-
+   try {
+   Partition hivePartition = getHivePartition(tablePath, 
partitionSpec);
+   // Set table stats
+   if (needToUpdateStatistics(partitionStatistics, 
hivePartition.getParameters())) {
+   updateStatisticsParameters(partitionStatistics, 
hivePartition.getParameters());
+   
client.alter_partition(tablePath.getDatabaseName(), tablePath.getObjectName(), 
hivePartition);
+   }
+   } catch (TableNotExistException | PartitionSpecInvalidException 
e) {
+   throw new PartitionNotExistException(getName(), 
tablePath, partitionSpec, e);
+   } catch (TException e) {
+ 

[jira] [Comment Edited] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog

2019-06-14 Thread Xuefu Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864356#comment-16864356
 ] 

Xuefu Zhang edited comment on FLINK-12771 at 6/14/19 6:49 PM:
--

Hi [~dawidwys], 

Re: #2, communicating with Hive metastore is the main reason for having 
HiveCatalog, but it doesn't necessarily mean that's all it can do. In Hive, a 
user can create temporary tables/functions via DDL, which requires no 
communication to Hive metastore. Such session specific objects are stored in 
user session (or client). (In fact, Hive metastore provided a type of client 
that has this functionality built-in.) Those objects share the same namespace 
with the persistent objects stored in Hive metastore. I'd argue that Flink's 
temp objects such as inline tables have the same nature, and can be handled in 
a similar way. I agree this isn't the only solution. Storing them in a 
dedicated, in memory catalog is one possibility, and having a session-specific 
structure holding them is another. We feel extending the capability of 
HiveCatalog to share and store them having greater advantages.

Re: #3, persistence is just one part of the store, but we have decoupled the 
types of the object to be stored and the nature of the catalog. This happened 
when we hosted both Flink tables and Hive tables via HiveCatalog. That is, a 
catalog may store any type of tables. The work here is just an extension to 
that. Along the same idea, we shouldn't stop a user to define either a Hive 
table, a generic table, or an inline table in an in-memory catalog. If a hive 
table is correctly define in in-memory catalog, Flink has no problem read/write 
that table. The difference is that the table definition doesn't survive beyond 
user session. In summary, we decoupled the object types with the types of the 
catalog.

I understand there might be a perception that anything registered in 
HiveCatalog should be persisted. We know this isn't true for Hive's temporary 
tables and functions. I think we just need to educate user that certain tables 
(such as inline tables) are temporary in nature and valid only in current 
session, which was already true before HiveCatalog is introduced.

Please let me know if there are more questions or comments.

P.S. just noticed that Bowen had responded, but I guess it's okay if there is 
some duplicated points.




was (Author: xuefuz):
Hi [~dawidwys], 

Re: #2, communicating with Hive metastore is the main reason for having 
HiveCatalog, but it doesn't necessarily mean that's all it can do. In Hive, a 
user can create temporary tables/functions via DDL, which requires no 
communication to Hive metastore. Such session specific objects are stored in 
user session (or client). (In fact, Hive metastore provided a type of client 
that has this functionality built-in.) Those objects share the same namespace 
with the persistent objects stored in Hive metastore. I'd argue that Flink's 
temp objects such as inline tables have the same nature, and can be handled in 
a similar way. I agree this isn't the only solution. Storing them in a 
dedicated, in memory catalog is one possibility, and having a session-specific 
structure holding them is another. We feel extending the capability of 
HiveCatalog to share and store them having greater advantages.

Re: #3, persistence is just one part of the store, but we have decoupled the 
types of the object to be stored and the nature of the catalog. This happened 
when we hosted both Flink tables and Hive tables via HiveCatalog. That is, a 
catalog may store any type of tables. The work here is just an extension to 
that. Along the same idea, we shouldn't stop a user to define either a Hive 
table, a generic table, or an inline table in an in-memory catalog. If a hive 
catalog is correctly define in in-memory catalog, Flink has no problem 
read/write that table. The difference is that the table definition doesn't 
survive beyond user session.

I understand there might be a perception that anything registered in 
HiveCatalog should be persisted. We know this isn't true for Hive's temporary 
tables and functions. I think we just need to educate user that certain tables 
(such as inline tables) are temporary in nature and valid only in current 
session, which was already true before HiveCatalog is introduced.

Please let me know if there are more questions or comments.



> Support ConnectorCatalogTable in HiveCatalog
> 
>
> Key: FLINK-12771
> URL: https://issues.apache.org/jira/browse/FLINK-12771
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time S

[jira] [Commented] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog

2019-06-14 Thread Xuefu Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864356#comment-16864356
 ] 

Xuefu Zhang commented on FLINK-12771:
-

Hi [~dawidwys], 

Re: #2, communicating with Hive metastore is the main reason for having 
HiveCatalog, but it doesn't necessarily mean that's all it can do. In Hive, a 
user can create temporary tables/functions via DDL, which requires no 
communication to Hive metastore. Such session specific objects are stored in 
user session (or client). (In fact, Hive metastore provided a type of client 
that has this functionality built-in.) Those objects share the same namespace 
with the persistent objects stored in Hive metastore. I'd argue that Flink's 
temp objects such as inline tables have the same nature, and can be handled in 
a similar way. I agree this isn't the only solution. Storing them in a 
dedicated, in memory catalog is one possibility, and having a session-specific 
structure holding them is another. We feel extending the capability of 
HiveCatalog to share and store them having greater advantages.

Re: #3, persistence is just one part of the store, but we have decoupled the 
types of the object to be stored and the nature of the catalog. This happened 
when we hosted both Flink tables and Hive tables via HiveCatalog. That is, a 
catalog may store any type of tables. The work here is just an extension to 
that. Along the same idea, we shouldn't stop a user to define either a Hive 
table, a generic table, or an inline table in an in-memory catalog. If a hive 
catalog is correctly define in in-memory catalog, Flink has no problem 
read/write that table. The difference is that the table definition doesn't 
survive beyond user session.

I understand there might be a perception that anything registered in 
HiveCatalog should be persisted. We know this isn't true for Hive's temporary 
tables and functions. I think we just need to educate user that certain tables 
(such as inline tables) are temporary in nature and valid only in current 
session, which was already true before HiveCatalog is introduced.

Please let me know if there are more questions or comments.



> Support ConnectorCatalogTable in HiveCatalog
> 
>
> Key: FLINK-12771
> URL: https://issues.apache.org/jira/browse/FLINK-12771
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently {{HiveCatalog}} does not support {{ConnectorCatalogTable}}. There's 
> a major drawback on this when it comes to real use cases, that is when Table 
> API users set a {{HiveCatalog}} as their default catalog (which is very 
> likely), they cannot create or use any inline table sources/sinks with their 
> default catalog any more. It's really inconvenient for Table API users to use 
> Flink for exploration, experiment, and production.
> There are several workaround in this case. E.g. users have to switch their 
> default catalog, but that misses our original intention of having a default 
> {{HiveCatalog}}; or users can register their inline source/sinks to Flink's 
> default catalog which is a in memory catalog, but that not only require users 
> to type full path of a table but also requires users to be aware of the 
> Flink's default catalog, default db, and their names. In short, none of the 
> workaround seems to be reasonable and user friendly.
> From another perspective, Hive has the concept of temporary tables that are 
> stored in memory of Hive metastore client and are removed when client is shut 
> down. In Flink, {{ConnectorCatalogTable}} can be seen as a type of 
> session-based temporary table, and {{HiveCatalog}} (potentially any catalog 
> implementations) can store it in memory. By introducing the concept of temp 
> table, we could greatly eliminate frictions for users and raise their 
> experience and productivity.
> Thus, we propose adding a simple in memory map for {{ConnectorCatalogTable}} 
> in {{HiveCatalog}} to allow users create and use inline source/sink when 
> their default catalog is a {{HiveCatalog}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog

2019-06-14 Thread GitBox
bowenli86 commented on a change in pull request #8703: 
[FLINK-12807][hive]Support Hive table columnstats related operations in 
HiveCatalog
URL: https://github.com/apache/flink/pull/8703#discussion_r293925009
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -1074,15 +1074,14 @@ public void alterTableStatistics(ObjectPath tablePath, 
CatalogTableStatistics ta
}
 
@Override
-   public void alterTableColumnStatistics(ObjectPath tablePath, 
CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws 
TableNotExistException, CatalogException {
+   public void alterTableColumnStatistics(ObjectPath tablePath, 
CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws 
TableNotExistException, CatalogException, TableNotPartitionedException {
try {
Table hiveTable = getHiveTable(tablePath);
// Set table column stats. This only works for 
non-partitioned tables.
if (!isTablePartitioned(hiveTable)) {
-   
client.updateTableColumnStatistics(HiveCatalogUtil.createTableColumnStats(hiveTable,
 columnStatistics.getColumnStatisticsData()));
+   
client.updateTableColumnStatistics(HiveStatsUtil.createTableColumnStats(hiveTable,
 columnStatistics.getColumnStatisticsData()));
} else {
-   throw new 
CatalogException(String.format("Failed to alter partition table column stats of 
table %s",
-   
tablePath.getFullName()));
+   throw new 
TableNotPartitionedException(getName(), tablePath);
 
 Review comment:
   shouldn't this be a `TablePartitionedException`? not 
`TableNotPartitionedException`
   
   we need to add a `TablePartitionedException.class`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog

2019-06-14 Thread GitBox
bowenli86 commented on a change in pull request #8703: 
[FLINK-12807][hive]Support Hive table columnstats related operations in 
HiveCatalog
URL: https://github.com/apache/flink/pull/8703#discussion_r293926968
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -1116,11 +1117,14 @@ public void alterPartitionColumnStatistics(ObjectPath 
tablePath, CatalogPartitio
private String getPartitionName(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec, Table hiveTable) throws 
PartitionSpecInvalidException {
List partitionCols = 
getFieldNames(hiveTable.getPartitionKeys());
List partitionVals = 
getOrderedFullPartitionValues(partitionSpec, partitionCols, tablePath);
-   List partKVs = new ArrayList<>();
+   StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < partitionCols.size(); i++) {
-   partKVs.add(partitionCols.get(i) + "=" + 
partitionVals.get(i));
+   
stringBuilder.append(partitionCols.get(i)).append("=").append(partitionVals.get(i));
+   if (i < partitionCols.size() - 1) {
+   stringBuilder.append("/");
+   }
}
-   return org.apache.commons.lang3.StringUtils.join(partKVs, "/");
 
 Review comment:
   we can keep the `List partKVs` and just call `return 
String.join("/", partKVs)` at the end


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8703: [FLINK-12807][hive]Support Hive table columnstats related operations in HiveCatalog

2019-06-14 Thread GitBox
bowenli86 commented on a change in pull request #8703: 
[FLINK-12807][hive]Support Hive table columnstats related operations in 
HiveCatalog
URL: https://github.com/apache/flink/pull/8703#discussion_r293927620
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
 ##
 @@ -162,38 +148,44 @@ private static CatalogColumnStatisticsDataBase 
createTableColumnStats(DataType c
} else if (stats.isSetDoubleStats()) {
DoubleColumnStatsData doubleStats = 
stats.getDoubleStats();
return new CatalogColumnStatisticsDataDouble(
-   doubleStats.getLowValue(), 
doubleStats.getHighValue(),
-   doubleStats.getNumDVs(), 
doubleStats.getNumNulls());
+   doubleStats.getLowValue(),
+   doubleStats.getHighValue(),
+   doubleStats.getNumDVs(),
+   doubleStats.getNumNulls());
} else if (stats.isSetLongStats()) {
LongColumnStatsData longColStats = 
stats.getLongStats();
return new CatalogColumnStatisticsDataLong(
-   longColStats.getLowValue(), 
longColStats.getHighValue(),
-   longColStats.getNumDVs(), 
longColStats.getNumNulls());
+   longColStats.getLowValue(),
+   longColStats.getHighValue(),
+   longColStats.getNumDVs(),
+   longColStats.getNumNulls());
} else if (stats.isSetStringStats()) {
StringColumnStatsData stringStats = 
stats.getStringStats();
return new CatalogColumnStatisticsDataString(
-   stringStats.getMaxColLen(), 
stringStats.getAvgColLen(),
-   stringStats.getNumDVs(), 
stringStats.getNumNulls());
+   stringStats.getMaxColLen(),
+   stringStats.getAvgColLen(),
+   stringStats.getNumDVs(),
+   stringStats.getNumNulls());
} else {
LOG.warn("Flink does not support converting 
ColumnStatisticsData '{}' for Hive column type '{}' yet.", stats, colType);
return null;
}
}
 
-
/**
 * Convert Flink ColumnStats to Hive ColumnStatisticsData according to 
Hive column type.
 * Note we currently assume that, in Flink, the max and min of 
ColumnStats will be same type as the Flink column type.
 * For example, for SHORT and Long columns, the max and min of their 
ColumnStats should be of type SHORT and LONG.
 */
private static ColumnStatisticsData getColumnStatisticsData(DataType 
colType, CatalogColumnStatisticsDataBase colStat) {
-   LogicalType colLogicalType = colType.getLogicalType();
-   if (colLogicalType instanceof CharType || colLogicalType 
instanceof VarCharType) {
+   LogicalTypeRoot type = colType.getLogicalType().getTypeRoot();
+   if (type.equals(LogicalTypeRoot.CHAR)
+   || type.equals(LogicalTypeRoot.VARCHAR)) {
 
 Review comment:
   nit: one extra tab here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12836) Allow retained checkpoints to be persisted on success

2019-06-14 Thread Andrew Duffy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864343#comment-16864343
 ] 

Andrew Duffy commented on FLINK-12836:
--

Hey, yea the intention is to allow checkpoints to be reused after a successful 
completion of a job. The scenarios where this comes up for us:
 * Recurring bounded stream processing job, which can be re-run multiple times 
and pick up from where the previous one left off with state/offsets intact.
 * Hybrid storage: bootstrapping state by running over the complete set of 
historical data, then restoring that state to start reading realtime data.

For both of these, it's considerably easier for an outside service (or human) 
to orchestrate around job completions than trying to properly time a 
cancellation, particularly when you're already trying to read one source to 
completion.

 

> Allow retained checkpoints to be persisted on success
> -
>
> Key: FLINK-12836
> URL: https://issues.apache.org/jira/browse/FLINK-12836
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Andrew Duffy
>Assignee: vinoyang
>Priority: Major
>
> Currently, retained checkpoints are persisted with one of 3 strategies:
>  * {color:#33}CHECKPOINT_NEVER_RETAINED:{color} Retained checkpoints are 
> never persisted
>  * {color:#33}CHECKPOINT_RETAINED_ON_FAILURE:{color}{color:#33} 
> Latest retained checkpoint{color} is persisted in the face of job failures
>  * {color:#33}CHECKPOINT_RETAINED_ON_CANCELLATION{color}: Latest retained 
> checkpoint is persisted when job is canceled externally (e.g. via the REST 
> API)
>  
> I'm proposing a third persistence mode: _CHECKPOINT_RETAINED_ALWAYS_. This 
> mode would ensure that retained checkpoints are retained on successful 
> completion of the job, and can be resumed from later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12647) Make partition release on consumption optional

2019-06-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12647:
-
Description: ResultPartitions being released on consumption should no 
longer a hard-coded behavior, which is a prerequisite for making partitions 
consumable multiple times, without knowing ahead-of-time how often that might 
be. To retain the existing behavior a configurable flag should be introduced to 
force the consumption of all release partitions,which will likely be removed in 
the future.  (was: Add a feature flag so that the network continues to 
automatically release partitions once they have been consumed, to ease 
development and potentially as a failsafe in the future.)

> Make partition release on consumption optional
> --
>
> Key: FLINK-12647
> URL: https://issues.apache.org/jira/browse/FLINK-12647
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> ResultPartitions being released on consumption should no longer a hard-coded 
> behavior, which is a prerequisite for making partitions consumable multiple 
> times, without knowing ahead-of-time how often that might be. To retain the 
> existing behavior a configurable flag should be introduced to force the 
> consumption of all release partitions,which will likely be removed in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12647) Make partition release on consumption optional

2019-06-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-12647:
-
Summary: Make partition release on consumption optional  (was: Add feature 
flag to disable automatic release of partitions)

> Make partition release on consumption optional
> --
>
> Key: FLINK-12647
> URL: https://issues.apache.org/jira/browse/FLINK-12647
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add a feature flag so that the network continues to automatically release 
> partitions once they have been consumed, to ease development and potentially 
> as a failsafe in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12647) Make partition release on consumption optional

2019-06-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-12647.

Resolution: Fixed

master: d9ac854a4155d14ffd9d81f883f796231b8e55e5

> Make partition release on consumption optional
> --
>
> Key: FLINK-12647
> URL: https://issues.apache.org/jira/browse/FLINK-12647
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add a feature flag so that the network continues to automatically release 
> partitions once they have been consumed, to ease development and potentially 
> as a failsafe in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol merged pull request #8654: [FLINK-12647][network] Add feature flag to disable release of consumed blocking partitions

2019-06-14 Thread GitBox
zentol merged pull request #8654: [FLINK-12647][network] Add feature flag to 
disable release of consumed blocking partitions
URL: https://github.com/apache/flink/pull/8654
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12851) Add travis profile for gelly/kafka

2019-06-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-12851.

Resolution: Fixed

master: b63c0f16dc0cd727d30459ee4d450662e47fe5f0

> Add travis profile for gelly/kafka
> --
>
> Key: FLINK-12851
> URL: https://issues.apache.org/jira/browse/FLINK-12851
> Project: Flink
>  Issue Type: Improvement
>  Components: Travis
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The misc/tests profiles frequently hit timeouts; we can resolve this by 
> moving gelly and the universal kafka connector into a separate profile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol merged pull request #8743: [FLINK-12851][travis] Move gelly/kafka to separate profile

2019-06-14 Thread GitBox
zentol merged pull request #8743: [FLINK-12851][travis] Move gelly/kafka to 
separate profile 
URL: https://github.com/apache/flink/pull/8743
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog

2019-06-14 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864321#comment-16864321
 ] 

Bowen Li edited comment on FLINK-12771 at 6/14/19 6:15 PM:
---

Hi [~dawidwys] , here the [spec for Hive temp tables 
|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-TemporaryTables].
 Basically the temporary objects will be held at Hive metastore client (client 
side), rather than Hive metastore service (server side)

Re: 2. IMHO, HiveCatalog offers capability of persisting metadata, but we 
shouldn't limit ourselves to that considering the use cases and experience we 
want to support.

Re: 3/4. We should put good, clear documentation in place to educate users only 
the new {{CatalogTable}} can be persisted, as well as loggings for applications 
and noticeable reminder for interactive consoles. 

Note that this only effect Table API users, not SQL CLI users. Table API users 
are typically more experienced and advanced Flink users, and it won't be hard 
for them to learn that, all inline tables are not persistent in 1.8 or older 
versions, and they will remain so in 1.9 and beyond.
 


was (Author: phoenixjiangnan):
Hi [~dawidwys] , here the [spec for Hive temp tables 
|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-TemporaryTables].
 Basically the temporary objects will be held at Hive metastore client (client 
side), rather than Hive metastore service (server side)

Re: 2. IMHO, HiveCatalog offers capability of persisting metadata, but we 
shouldn't limit ourselves to that considering the use cases and experience we 
want to support.

Re: 3/4. We should put good, clear documentation in place to educate users only 
the new {{CatalogTable}} can be persisted, as well as loggings for applications 
and noticeable reminder for interactive consoles. 

Note that this only effect Table API users, not SQL CLI users. Table API users 
are often more experienced in Flink, and it won't be hard for them to learn 
that, all inline tables are not persistent in 1.8 or older versions, and they 
will remain so in 1.9 and beyond.
 

> Support ConnectorCatalogTable in HiveCatalog
> 
>
> Key: FLINK-12771
> URL: https://issues.apache.org/jira/browse/FLINK-12771
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently {{HiveCatalog}} does not support {{ConnectorCatalogTable}}. There's 
> a major drawback on this when it comes to real use cases, that is when Table 
> API users set a {{HiveCatalog}} as their default catalog (which is very 
> likely), they cannot create or use any inline table sources/sinks with their 
> default catalog any more. It's really inconvenient for Table API users to use 
> Flink for exploration, experiment, and production.
> There are several workaround in this case. E.g. users have to switch their 
> default catalog, but that misses our original intention of having a default 
> {{HiveCatalog}}; or users can register their inline source/sinks to Flink's 
> default catalog which is a in memory catalog, but that not only require users 
> to type full path of a table but also requires users to be aware of the 
> Flink's default catalog, default db, and their names. In short, none of the 
> workaround seems to be reasonable and user friendly.
> From another perspective, Hive has the concept of temporary tables that are 
> stored in memory of Hive metastore client and are removed when client is shut 
> down. In Flink, {{ConnectorCatalogTable}} can be seen as a type of 
> session-based temporary table, and {{HiveCatalog}} (potentially any catalog 
> implementations) can store it in memory. By introducing the concept of temp 
> table, we could greatly eliminate frictions for users and raise their 
> experience and productivity.
> Thus, we propose adding a simple in memory map for {{ConnectorCatalogTable}} 
> in {{HiveCatalog}} to allow users create and use inline source/sink when 
> their default catalog is a {{HiveCatalog}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog

2019-06-14 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864321#comment-16864321
 ] 

Bowen Li commented on FLINK-12771:
--

Hi [~dawidwys] , here the [spec for Hive temp tables 
|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-TemporaryTables].
 Basically the temporary objects will be held at Hive metastore client (client 
side), rather than Hive metastore service (server side)

Re: 2. IMHO, HiveCatalog offers capability of persisting metadata, but we 
shouldn't limit ourselves to that considering the use cases and experience we 
want to support.

Re: 3/4. We should put good, clear documentation in place to educate users only 
the new {{CatalogTable}} can be persisted, as well as loggings for applications 
and noticeable reminder for interactive consoles. 

Note that this only effect Table API users, not SQL CLI users. Table API users 
are often more experienced in Flink, and it won't be hard for them to learn 
that, all inline tables are not persistent in 1.8 or older versions, and they 
will remain so in 1.9 and beyond.
 

> Support ConnectorCatalogTable in HiveCatalog
> 
>
> Key: FLINK-12771
> URL: https://issues.apache.org/jira/browse/FLINK-12771
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently {{HiveCatalog}} does not support {{ConnectorCatalogTable}}. There's 
> a major drawback on this when it comes to real use cases, that is when Table 
> API users set a {{HiveCatalog}} as their default catalog (which is very 
> likely), they cannot create or use any inline table sources/sinks with their 
> default catalog any more. It's really inconvenient for Table API users to use 
> Flink for exploration, experiment, and production.
> There are several workaround in this case. E.g. users have to switch their 
> default catalog, but that misses our original intention of having a default 
> {{HiveCatalog}}; or users can register their inline source/sinks to Flink's 
> default catalog which is a in memory catalog, but that not only require users 
> to type full path of a table but also requires users to be aware of the 
> Flink's default catalog, default db, and their names. In short, none of the 
> workaround seems to be reasonable and user friendly.
> From another perspective, Hive has the concept of temporary tables that are 
> stored in memory of Hive metastore client and are removed when client is shut 
> down. In Flink, {{ConnectorCatalogTable}} can be seen as a type of 
> session-based temporary table, and {{HiveCatalog}} (potentially any catalog 
> implementations) can store it in memory. By introducing the concept of temp 
> table, we could greatly eliminate frictions for users and raise their 
> experience and productivity.
> Thus, we propose adding a simple in memory map for {{ConnectorCatalogTable}} 
> in {{HiveCatalog}} to allow users create and use inline source/sink when 
> their default catalog is a {{HiveCatalog}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on issue #8720: [FLINK-12771][hive] Support ConnectorCatalogTable in HiveCatalog

2019-06-14 Thread GitBox
xuefuz commented on issue #8720: [FLINK-12771][hive] Support 
ConnectorCatalogTable in HiveCatalog
URL: https://github.com/apache/flink/pull/8720#issuecomment-502209972
 
 
   Also, since we are using a map to store the temp tables with combo of db 
name and table name as the key, I'm wondering what impact those database 
operations might have, such as drop a database.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] banmoy commented on issue #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable

2019-06-14 Thread GitBox
banmoy commented on issue #8611: [FLINK-12693][state] Store state per key-group 
in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#issuecomment-502176398
 
 
   @StefanRRichter I have addressed the comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable

2019-06-14 Thread GitBox
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store 
state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r293880412
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 ##
 @@ -172,14 +232,133 @@ public boolean isEmpty() {
 * @return the state of the mapping with the specified key/namespace 
composite key, or {@code null}
 * if no mapping for the specified key is found.
 */
-   public abstract S get(K key, N namespace);
+   public S get(K key, N namespace) {
+   int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, 
keyContext.getNumberOfKeyGroups());
+   return get(key, keyGroup, namespace);
+   }
+
+   public Stream getKeys(N namespace) {
+   return Arrays.stream(state)
+   .filter(Objects::nonNull)
+   .map(StateMap::iterator)
+   .flatMap(iter -> 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, 0), false))
 
 Review comment:
   Yes, we can create a spliterator with known size as you say. But in this 
case, we will not iterate keys in parallel, so the size will not be used. On 
the other hand, the size for `NestedStateMap` need some computation. So I think 
there is no need to get the size of the spliterator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8225: [FLINK-10984]Remove flink-shaded-hadoop from flink

2019-06-14 Thread GitBox
sunjincheng121 commented on issue #8225: [FLINK-10984]Remove 
flink-shaded-hadoop from flink
URL: https://github.com/apache/flink/pull/8225#issuecomment-502163792
 
 
   I appreciate if you can have another look :) @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Xpray commented on a change in pull request #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type

2019-06-14 Thread GitBox
Xpray commented on a change in pull request #8346: [FLINK-12405] [DataSet] 
Introduce BLOCKING_PERSISTENT result partition type
URL: https://github.com/apache/flink/pull/8346#discussion_r293862114
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
 ##
 @@ -49,13 +59,17 @@
/** Does this partition use a limited number of (network) buffers? */
private final boolean isBounded;
 
+   /** This partition will not be released after consuming if 
'isPersistent' is true. */
+   private final boolean isPersistent;
 
 Review comment:
   It seems "external" might be better than "global" since "internal" is better 
than "local"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #8346: [FLINK-12405] [DataSet] Introduce BLOCKING_PERSISTENT result partition type

2019-06-14 Thread GitBox
StephanEwen commented on a change in pull request #8346: [FLINK-12405] 
[DataSet] Introduce BLOCKING_PERSISTENT result partition type
URL: https://github.com/apache/flink/pull/8346#discussion_r293857231
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
 ##
 @@ -49,13 +59,17 @@
/** Does this partition use a limited number of (network) buffers? */
private final boolean isBounded;
 
+   /** This partition will not be released after consuming if 
'isPersistent' is true. */
+   private final boolean isPersistent;
 
 Review comment:
   I agree that BLOCKING_PERSISTENT is not the best name.
   The specific thing is that it lives across jobs, but I am not sure what a 
good name for that would be.
   "global" and "external" both go in that direction...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on issue #8642: [FLINK-1722][datastream] Enable the InitializeOnMaster and FinalizeOnMaster interfaces on datastream

2019-06-14 Thread GitBox
sunhaibotb commented on issue #8642: [FLINK-1722][datastream] Enable the 
InitializeOnMaster and FinalizeOnMaster interfaces on datastream
URL: https://github.com/apache/flink/pull/8642#issuecomment-502151128
 
 
   Thank you for reviewing @aljoscha . Because `JobMasterTest.Java` conflicts 
with the latest master, I will rebase on the top of the latest master and force 
push after addressing the comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable

2019-06-14 Thread GitBox
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store 
state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r293850537
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 ##
 @@ -156,10 +203,23 @@ public boolean isEmpty() {
 * @param transformation the transformation function.
 * @throws Exception if some exception happens in the transformation 
function.
 */
-   public abstract  void transform(
+   public  void transform(
N namespace,
T value,
-   StateTransformationFunction transformation) 
throws Exception;
+   StateTransformationFunction transformation) 
throws Exception {
+   K key = keyContext.getCurrentKey();
+   checkKeyNamespacePreconditions(key, namespace);
+
+   int keyGroup = keyContext.getCurrentKeyGroupIndex();
+   StateMap stateMap = getMapForKeyGroup(keyGroup);
+
+   if (stateMap == null) {
+   stateMap = createStateMap();
 
 Review comment:
   The style of checking `null` is just following the original 
NestedMapsStateTable. For the followup spill implementation, we just replace 
the on-heap map with on-disk map, instead of removing the map from the array, 
so there is no need to set `null` in the array. I agree with you that no map 
will stay `null` for long if the number of keys is more than the number of 
key-groups and keys are uniformly distributed, which I think is the most case. 
So I will initialize all array positions with maps and remove the check. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8744: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-06-14 Thread GitBox
flinkbot commented on issue #8744: [FLINK-12297]Harden ClosureCleaner to handle 
the wrapped function
URL: https://github.com/apache/flink/pull/8744#issuecomment-502145656
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aljoscha opened a new pull request #8744: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-06-14 Thread GitBox
aljoscha opened a new pull request #8744: [FLINK-12297]Harden ClosureCleaner to 
handle the wrapped function
URL: https://github.com/apache/flink/pull/8744
 
 
   Copy of #8280 for running CI


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable

2019-06-14 Thread GitBox
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store 
state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r293838949
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
 ##
 @@ -19,34 +19,123 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateEntry;
 import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
 /**
  * Abstract base class for snapshots of a {@link StateTable}. Offers a way to 
serialize the snapshot (by key-group).
  * All snapshots should be released after usage.
  */
 @Internal
-abstract class AbstractStateTableSnapshot> implements StateSnapshot {
+abstract class AbstractStateTableSnapshot>
+   implements StateSnapshot, StateSnapshot.StateKeyGroupWriter {
 
/**
 * The {@link StateTable} from which this snapshot was created.
 */
-   final T owningStateTable;
+   protected final T owningStateTable;
+
+   /**
+* A local duplicate of the table's key serializer.
+*/
+   @Nonnull
+   protected final TypeSerializer localKeySerializer;
+
+   /**
+* A local duplicate of the table's namespace serializer.
+*/
+   @Nonnull
+   protected final TypeSerializer localNamespaceSerializer;
+
+   /**
+* A local duplicate of the table's state serializer.
+*/
+   @Nonnull
+   protected final TypeSerializer localStateSerializer;
+
+   @Nullable
+   protected final StateSnapshotTransformer stateSnapshotTransformer;
 
/**
 * Creates a new {@link AbstractStateTableSnapshot} for and owned by 
the given table.
 *
 * @param owningStateTable the {@link StateTable} for which this object 
represents a snapshot.
 */
-   AbstractStateTableSnapshot(T owningStateTable) {
+   AbstractStateTableSnapshot(
+   T owningStateTable,
+   TypeSerializer localKeySerializer,
 
 Review comment:
   > The annotation also generates an assertion check for non-null if the code 
is executed with -ea
   
   I am confused about this. I use a simple code to test the assertion, but 
nothing happened no matter whether the `-ea` is added. The code is as follows, 
the version of jsr305 jar is 1.3.9 which is the same as flink used, and the 
version of java is 1.8.0_162.
   ```Java
   import javax.annotation.Nonnull;
   
   public class AnnotationTest {
   
public static void main(String[] args) {
func(null);
}
   
private static void func(@Nonnull Object object) {
System.out.println("success");
}
   }
   ```
   But I find Intellij has a setting `Add runtime assertions for not-null 
annotated methods and parameters`,and if I run the code in Intellij with this 
setting enabled, there will be an `IllegalArgumentException` throwed.
   Please let me know if I'm wrong. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable

2019-06-14 Thread GitBox
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store 
state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r293838949
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
 ##
 @@ -19,34 +19,123 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateEntry;
 import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
 /**
  * Abstract base class for snapshots of a {@link StateTable}. Offers a way to 
serialize the snapshot (by key-group).
  * All snapshots should be released after usage.
  */
 @Internal
-abstract class AbstractStateTableSnapshot> implements StateSnapshot {
+abstract class AbstractStateTableSnapshot>
+   implements StateSnapshot, StateSnapshot.StateKeyGroupWriter {
 
/**
 * The {@link StateTable} from which this snapshot was created.
 */
-   final T owningStateTable;
+   protected final T owningStateTable;
+
+   /**
+* A local duplicate of the table's key serializer.
+*/
+   @Nonnull
+   protected final TypeSerializer localKeySerializer;
+
+   /**
+* A local duplicate of the table's namespace serializer.
+*/
+   @Nonnull
+   protected final TypeSerializer localNamespaceSerializer;
+
+   /**
+* A local duplicate of the table's state serializer.
+*/
+   @Nonnull
+   protected final TypeSerializer localStateSerializer;
+
+   @Nullable
+   protected final StateSnapshotTransformer stateSnapshotTransformer;
 
/**
 * Creates a new {@link AbstractStateTableSnapshot} for and owned by 
the given table.
 *
 * @param owningStateTable the {@link StateTable} for which this object 
represents a snapshot.
 */
-   AbstractStateTableSnapshot(T owningStateTable) {
+   AbstractStateTableSnapshot(
+   T owningStateTable,
+   TypeSerializer localKeySerializer,
 
 Review comment:
   > The annotation also generates an assertion check for non-null if the code 
is executed with -ea
   
   I am confused about this. I use a simple code to test the assertion, but 
nothing happened no matter whether the `-ea` is added. The code is as follows, 
the version of jsr305 jar is 1.3.9 which is the same as flink used, and the 
version of java is 1.8.0_162.
   ```Java
   import javax.annotation.Nonnull;
   
   public class AnnotationTest {
   
public static void main(String[] args) {
func(null);
}
   
private static void func(@Nonnull Object object) {
System.out.println("success");
}
   }
   ```
   But I find Intellij has a setting `Add runtime assertions for not-null 
annotated methods and parameters`,and if I run the code in Intellij with this 
setting enabled, there will be an `IllegalArgumentException` throwed. Please 
let me know if I'm wrong. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable

2019-06-14 Thread GitBox
banmoy commented on a change in pull request #8611: [FLINK-12693][state] Store 
state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r293838949
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
 ##
 @@ -19,34 +19,123 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateEntry;
 import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
 /**
  * Abstract base class for snapshots of a {@link StateTable}. Offers a way to 
serialize the snapshot (by key-group).
  * All snapshots should be released after usage.
  */
 @Internal
-abstract class AbstractStateTableSnapshot> implements StateSnapshot {
+abstract class AbstractStateTableSnapshot>
+   implements StateSnapshot, StateSnapshot.StateKeyGroupWriter {
 
/**
 * The {@link StateTable} from which this snapshot was created.
 */
-   final T owningStateTable;
+   protected final T owningStateTable;
+
+   /**
+* A local duplicate of the table's key serializer.
+*/
+   @Nonnull
+   protected final TypeSerializer localKeySerializer;
+
+   /**
+* A local duplicate of the table's namespace serializer.
+*/
+   @Nonnull
+   protected final TypeSerializer localNamespaceSerializer;
+
+   /**
+* A local duplicate of the table's state serializer.
+*/
+   @Nonnull
+   protected final TypeSerializer localStateSerializer;
+
+   @Nullable
+   protected final StateSnapshotTransformer stateSnapshotTransformer;
 
/**
 * Creates a new {@link AbstractStateTableSnapshot} for and owned by 
the given table.
 *
 * @param owningStateTable the {@link StateTable} for which this object 
represents a snapshot.
 */
-   AbstractStateTableSnapshot(T owningStateTable) {
+   AbstractStateTableSnapshot(
+   T owningStateTable,
+   TypeSerializer localKeySerializer,
 
 Review comment:
   > The annotation also generates an assertion check for non-null if the code 
is executed with -ea
   
   I am confused about this. I use a simple code to test the assertion, but 
nothing happened no matter whether the `-ea` is added. The code is as follows, 
the version of jsr305 jar is 1.3.9 which is the same as flink used, and the 
version of java is 1.8.0_162.
   ```Java
   import javax.annotation.Nonnull;
   
   public class AnnotationTest {
   
public static void main(String[] args) {
func(null);
}
   
private static void func(@Nonnull Object object) {
System.out.println("success");
}
   }
   ```
   But I find Intellij has a setting `Add runtime assertions for not-null 
annotated methods and parameters`,and if I run the code in Intellij with this 
setting enabled, there will be an `IllegalArgumentException` throwed. Please 
let me know if I'm wrong. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8623: [FLINK-12719][python] Add the Python catalog API

2019-06-14 Thread GitBox
asfgit closed pull request #8623: [FLINK-12719][python] Add the Python catalog 
API
URL: https://github.com/apache/flink/pull/8623
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-14 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293830121
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -69,20 +133,11 @@
 *  evaluated as illegal by the validator
 */
public  Params set(ParamInfo info, V value) {
-   if (!info.isOptional() && value == null) {
-   throw new RuntimeException(
-   "Setting " + info.getName() + " as null while 
it's not a optional param");
-   }
-   if (value == null) {
-   remove(info);
-   return this;
-   }
-
if (info.getValidator() != null && 
!info.getValidator().validate(value)) {
throw new RuntimeException(
"Setting " + info.getName() + " as a invalid 
value:" + value);
}
-   paramMap.put(info.getName(), value);
+   params.put(info.getName(), valueToJson(value));
 
 Review comment:
   If we changed ParamInfo name, we can add the old name to aliases, thus, when 
we get params from old json, we could also get the param value.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12719) Add the Python catalog API

2019-06-14 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12719.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master: e087c1e803b1535dbb07873c3ddaa1a214b2c849

> Add the Python catalog API
> --
>
> Key: FLINK-12719
> URL: https://issues.apache.org/jira/browse/FLINK-12719
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The new catalog API is almost ready. We should add the corresponding Python 
> catalog API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-14 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864111#comment-16864111
 ] 

Congxian Qiu(klion26) commented on FLINK-12619:
---

Thanks for the doc [~carp84], and the confirmation [~aljoscha], I'll continue 
to complete the pr :)

> Support TERMINATE/SUSPEND Job with Checkpoint
> -
>
> Key: FLINK-12619
> URL: https://issues.apache.org/jira/browse/FLINK-12619
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Inspired by the idea of FLINK-11458, we propose to support terminate/suspend 
> a job with checkpoint. This improvement cooperates with incremental and 
> external checkpoint features, that if checkpoint is retained and this feature 
> is configured, we will trigger a checkpoint before the job stops. It could 
> accelarate job recovery a lot since:
> 1. No source rewinding required any more.
> 2. It's much faster than taking a savepoint since incremental checkpoint is 
> enabled.
> Please note that conceptually savepoints is different from checkpoint in a 
> similar way that backups are different from recovery logs in traditional 
> database systems. So we suggest using this feature only for job recovery, 
> while stick with FLINK-11458 for the 
> upgrading/cross-cluster-job-migration/state-backend-switch cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-14 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293826982
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -114,38 +171,104 @@ public Params clone() {
 * @return a json containing all parameters in this Params
 */
public String toJson() {
-   ObjectMapper mapper = new ObjectMapper();
-   Map stringMap = new HashMap<>();
try {
-   for (Map.Entry e : paramMap.entrySet()) 
{
-   stringMap.put(e.getKey(), 
mapper.writeValueAsString(e.getValue()));
-   }
-   return mapper.writeValueAsString(stringMap);
+   return mapper.writeValueAsString(params);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize params 
to json", e);
}
}
 
/**
 * Restores the parameters from the given json. The parameters should 
be exactly the same with
-* the one who was serialized to the input json after the restoration. 
The class mapping of the
-* parameters in the json is required because it is hard to directly 
restore a param of a user
-* defined type. Params will be treated as String if it doesn't exist 
in the {@code classMap}.
+* the one who was serialized to the input json after the restoration.
 *
-* @param json the json String to restore from
-* @param classMap the classes of the parameters contained in the json
+* @param json the json String to restore from
 */
@SuppressWarnings("unchecked")
-   public void loadJson(String json, Map> classMap) {
+   public void loadJson(String json) {
ObjectMapper mapper = new ObjectMapper();
+   Map params;
try {
-   Map m = mapper.readValue(json, 
Map.class);
-   for (Map.Entry e : m.entrySet()) {
-   Class valueClass = 
classMap.getOrDefault(e.getKey(), String.class);
-   paramMap.put(e.getKey(), 
mapper.readValue(e.getValue(), valueClass));
+   params = mapper.readValue(json, Map.class);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
json:" + json, e);
+   }
+   this.params.clear();
+   this.params.putAll(params);
+   }
+
+   /**
+* Factory method for constructing params.
+*
+* @param json the json string to load
+* @return the {@code Params} loaded from the json string.
+*/
+   public static Params fromJson(String json) {
 
 Review comment:
   We have redefined the loadJson(), and static method  fromJson() is easy to 
generate a new Params.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-14 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293825335
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -114,38 +171,104 @@ public Params clone() {
 * @return a json containing all parameters in this Params
 */
public String toJson() {
-   ObjectMapper mapper = new ObjectMapper();
-   Map stringMap = new HashMap<>();
try {
-   for (Map.Entry e : paramMap.entrySet()) 
{
-   stringMap.put(e.getKey(), 
mapper.writeValueAsString(e.getValue()));
-   }
-   return mapper.writeValueAsString(stringMap);
+   return mapper.writeValueAsString(params);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize params 
to json", e);
}
}
 
/**
 * Restores the parameters from the given json. The parameters should 
be exactly the same with
-* the one who was serialized to the input json after the restoration. 
The class mapping of the
-* parameters in the json is required because it is hard to directly 
restore a param of a user
-* defined type. Params will be treated as String if it doesn't exist 
in the {@code classMap}.
+* the one who was serialized to the input json after the restoration.
 *
-* @param json the json String to restore from
-* @param classMap the classes of the parameters contained in the json
+* @param json the json String to restore from
 */
@SuppressWarnings("unchecked")
-   public void loadJson(String json, Map> classMap) {
+   public void loadJson(String json) {
ObjectMapper mapper = new ObjectMapper();
+   Map params;
try {
-   Map m = mapper.readValue(json, 
Map.class);
-   for (Map.Entry e : m.entrySet()) {
-   Class valueClass = 
classMap.getOrDefault(e.getKey(), String.class);
-   paramMap.put(e.getKey(), 
mapper.readValue(e.getValue(), valueClass));
+   params = mapper.readValue(json, Map.class);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
json:" + json, e);
+   }
+   this.params.clear();
 
 Review comment:
   Yes, I have changed to merge new params to exist params.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-14 Thread GitBox
xuyang1706 commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293823761
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -114,38 +171,104 @@ public Params clone() {
 * @return a json containing all parameters in this Params
 */
public String toJson() {
-   ObjectMapper mapper = new ObjectMapper();
-   Map stringMap = new HashMap<>();
try {
-   for (Map.Entry e : paramMap.entrySet()) 
{
-   stringMap.put(e.getKey(), 
mapper.writeValueAsString(e.getValue()));
-   }
-   return mapper.writeValueAsString(stringMap);
+   return mapper.writeValueAsString(params);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize params 
to json", e);
}
}
 
/**
 * Restores the parameters from the given json. The parameters should 
be exactly the same with
-* the one who was serialized to the input json after the restoration. 
The class mapping of the
-* parameters in the json is required because it is hard to directly 
restore a param of a user
-* defined type. Params will be treated as String if it doesn't exist 
in the {@code classMap}.
+* the one who was serialized to the input json after the restoration.
 *
-* @param json the json String to restore from
-* @param classMap the classes of the parameters contained in the json
+* @param json the json String to restore from
 */
@SuppressWarnings("unchecked")
-   public void loadJson(String json, Map> classMap) {
+   public void loadJson(String json) {
ObjectMapper mapper = new ObjectMapper();
+   Map params;
try {
-   Map m = mapper.readValue(json, 
Map.class);
-   for (Map.Entry e : m.entrySet()) {
-   Class valueClass = 
classMap.getOrDefault(e.getKey(), String.class);
-   paramMap.put(e.getKey(), 
mapper.readValue(e.getValue(), valueClass));
+   params = mapper.readValue(json, Map.class);
+   } catch (IOException e) {
+   throw new RuntimeException("Failed to deserialize 
json:" + json, e);
+   }
+   this.params.clear();
+   this.params.putAll(params);
+   }
+
+   /**
+* Factory method for constructing params.
+*
+* @param json the json string to load
+* @return the {@code Params} loaded from the json string.
+*/
+   public static Params fromJson(String json) {
+   Params params = new Params();
+   params.loadJson(json);
+   return params;
+   }
+
+   /**
+* Merge other params into this.
+*
+* @param otherParams other params
+* @return this
+*/
+   public Params merge(Params otherParams) {
+   if (otherParams != null) {
+   this.params.putAll(otherParams.params);
+   }
+   return this;
+   }
+
+   /**
+* Creates and returns a deep clone of this Params.
+*
+* @return a deep clone of this Params
+*/
+   @Override
+   public Params clone() {
+   Params newParams = new Params();
+   newParams.params.putAll(this.params);
+   return newParams;
+   }
+
+   private void assertMapperInited() {
+   if (mapper == null) {
+   mapper = new ObjectMapper();
+   }
+   }
+
+   private String valueToJson(Object value) {
+   assertMapperInited();
+   try {
+   if (value == null) {
+   return null;
}
+   return mapper.writeValueAsString(value);
+   } catch (JsonProcessingException e) {
+   throw new RuntimeException("Failed to serialize to 
json:" + value, e);
+   }
+   }
+
+   private  T valueFromJson(String json, Class clazz) {
+   assertMapperInited();
+   try {
+   if (json == null) {
+   return null;
+   }
+   return mapper.readValue(json, clazz);
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize 
json:" + json, e);
}
}
+
+   private  List getParamNameAndAlias(
+   ParamInfo  paramInfo) {
+

[GitHub] [flink] shaomengwang commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-14 Thread GitBox
shaomengwang commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293789454
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -44,17 +86,39 @@
 * @param   the type of the specific parameter
 * @return the value of the specific parameter, or default value 
defined in the {@code info} if
 * this Params doesn't contain the parameter
-* @throws RuntimeException if the Params doesn't contains the specific 
parameter, while the
-*  param is not optional but has no default 
value in the {@code info}
+* @throws IllegalArgumentException if the Params doesn't contains the 
specific parameter, while the
+*  param is not optional but has no default 
value in the {@code info} or
+*  if the Params contains the specific 
parameter and alias, but has more
+*  than one value or
+*  if the Params doesn't contains the specific 
parameter, while the ParamInfo
+*  is optional but has no default value
 */
-   @SuppressWarnings("unchecked")
public  V get(ParamInfo info) {
-   V value = (V) paramMap.getOrDefault(info.getName(), 
info.getDefaultValue());
-   if (value == null && !info.isOptional() && 
!info.hasDefaultValue()) {
-   throw new RuntimeException(info.getName() +
-   " not exist which is not optional and don't 
have a default value");
+   String value = null;
+   String usedParamName = null;
+   for (String nameOrAlias : getParamNameAndAlias(info)) {
+   String v = params.get(nameOrAlias);
+   if (value != null && v != null) {
+   throw new 
IllegalArgumentException(String.format("Duplicate parameters of %s and %s",
 
 Review comment:
   `params.put` can use `ParamInfo.getName` to get the key. It will not be able 
to generate duplicates and there is no information about paramName and alias at 
`putAll` from json. Probably `params.get` here is the proper place to check 
duplicates


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-14 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864098#comment-16864098
 ] 

Aljoscha Krettek commented on FLINK-12619:
--

We had a short (but productive) call on this topic, so don't be surprised by us 
agreeing suddenly. ;-)

After outrdiscussion and having read the document I now agree with this view 
and think the analogy to database snapshots is very good. We should go forward 
with this stop-with-checkpoint feature.

Regarding FLIP-41, we agreed to call it unified format even though it's for now 
only used for savepoints.

> Support TERMINATE/SUSPEND Job with Checkpoint
> -
>
> Key: FLINK-12619
> URL: https://issues.apache.org/jira/browse/FLINK-12619
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Inspired by the idea of FLINK-11458, we propose to support terminate/suspend 
> a job with checkpoint. This improvement cooperates with incremental and 
> external checkpoint features, that if checkpoint is retained and this feature 
> is configured, we will trigger a checkpoint before the job stops. It could 
> accelarate job recovery a lot since:
> 1. No source rewinding required any more.
> 2. It's much faster than taking a savepoint since incremental checkpoint is 
> enabled.
> Please note that conceptually savepoints is different from checkpoint in a 
> similar way that backups are different from recovery logs in traditional 
> database systems. So we suggest using this feature only for job recovery, 
> while stick with FLINK-11458 for the 
> upgrading/cross-cluster-job-migration/state-backend-switch cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-14 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864068#comment-16864068
 ] 

Aljoscha Krettek edited comment on FLINK-12619 at 6/14/19 1:59 PM:
---

Wrote a document to arrange and better share my thoughts, and below is a brief 
summary:

# Conceptually it worth a second thought about introducing an optimized 
snapshot format for now (i.e. use checkpoint format in savepoint), just like 
it's not recommended to use snapshot for backup in database (although 
practically it could be implemented).
# Stop-with-checkpoint mechanism is like stopping database instance with a data 
flush, thus (IMHO) a different story from the checkpoint/savepoint (db 
snapshot/backup) diversity.
# In the long run we may improve the checkpoint to allow a short enough 
interval thus it may become some format of transactional log, then we could 
enable checkpoint-based savepoint (like transactional log based backup), so I 
agree to still call the new format in FLIP-41 a "Unified Format" although in 
the short term it only unifies savepoint.

Please check it and let me know your thoughts everyone. Thanks!

The document: 
https://docs.google.com/document/d/1uE4R3wNal6e67FkDe0UvcnsIMMDpr35j



was (Author: carp84):
Wrote a document to arrange and better share my thoughts, and below is a brief 
summary:

# Conceptually it worth a second thought about introducing an optimized 
snapshot format for now (i.e. use checkpoint format in savepoint), just like 
it's not recommended to use snapshot for backup in database (although 
practically it could be implemented).
# Stop-with-checkpoint mechanism is like stopping database instance with a data 
flush, thus (IMHO) a different story from the checkpoint/savepoint (db 
snapshot/backup) diversity.
# In the long run we may improve the checkpoint to allow a short enough 
interval thus it may become some format of transactional log, then we could 
enable checkpoint-based savepoint (like transactional log based backup), so I 
agree to still call the new format in FLIP-41 a "Unified Format" although in 
the short term it only unifies savepoint.

Please check it and let me know your thoughts everyone. Thanks!

> Support TERMINATE/SUSPEND Job with Checkpoint
> -
>
> Key: FLINK-12619
> URL: https://issues.apache.org/jira/browse/FLINK-12619
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Inspired by the idea of FLINK-11458, we propose to support terminate/suspend 
> a job with checkpoint. This improvement cooperates with incremental and 
> external checkpoint features, that if checkpoint is retained and this feature 
> is configured, we will trigger a checkpoint before the job stops. It could 
> accelarate job recovery a lot since:
> 1. No source rewinding required any more.
> 2. It's much faster than taking a savepoint since incremental checkpoint is 
> enabled.
> Please note that conceptually savepoints is different from checkpoint in a 
> similar way that backups are different from recovery logs in traditional 
> database systems. So we suggest using this feature only for job recovery, 
> while stick with FLINK-11458 for the 
> upgrading/cross-cluster-job-migration/state-backend-switch cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12773) Unstable kafka e2e test

2019-06-14 Thread Alex (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864095#comment-16864095
 ] 

Alex commented on FLINK-12773:
--

Is it possible to download/prepare external dependencies once and provision 
them in the CI environment?
Alternatively, upload them into S3 and use S3 storage instead.

Both options would add maintenance overhead, but at least such approach can be 
more reliable.

> Unstable kafka e2e test
> ---
>
> Key: FLINK-12773
> URL: https://issues.apache.org/jira/browse/FLINK-12773
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.9.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>
> 'Kafka 0.10 end-to-end test' fails on Travis occasionally because of 
> corrupted downloaded kafka archive.
> https://api.travis-ci.org/v3/job/542507472/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12143) Mechanism to ship plugin jars in the cluster

2019-06-14 Thread Alex (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864085#comment-16864085
 ] 

Alex commented on FLINK-12143:
--

The problem is because
 * {{parentFirstLoaderPatterns}} contains {{org.apache.flink.}} as matching 
prefix;
 * the hadoop file system component loads some classes dynamically 
({{java.lang.Class.forName}}) and one of them 
{{org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.PropertiesConfiguration}}
 matches the pattern.
This triggers to use the application global class loader, instead of searching 
in the jar.


The test failure can be reproduced by slightly modifying {{PluginLoaderTest}}:

setting the matching {{loaderExcludePatterns}} on [line 
42|[https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java#L42]].
 For example to:
{code:java}
// Use explicit prefix of org.apache.flink.test.plugin.jar.plugina.DynamicClassA
PluginDescriptor pluginDescriptorA = new PluginDescriptor("A", new 
URL[]{classpathA}, new String[]{"org.apache.flink."});
{code}
 
Although, running this test in IDE would not throw {{ClassNotFound}} exception, 
but the actual class lookup would use the global class loader.

> Mechanism to ship plugin jars in the cluster
> 
>
> Key: FLINK-12143
> URL: https://issues.apache.org/jira/browse/FLINK-12143
> Project: Flink
>  Issue Type: Sub-task
>  Components: FileSystems, Runtime / Coordination
>Reporter: Stefan Richter
>Assignee: Alex
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] EugeneYushin commented on issue #8187: [FLINK-12197] [Formats] Avro row deser for Confluent binary format

2019-06-14 Thread GitBox
EugeneYushin commented on issue #8187: [FLINK-12197] [Formats] Avro row deser 
for Confluent binary format
URL: https://github.com/apache/flink/pull/8187#issuecomment-502113331
 
 
   @dawidwys can you please ping an another person who can help reviewing this 
PR in case you have no capacity?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-14 Thread Yu Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864068#comment-16864068
 ] 

Yu Li edited comment on FLINK-12619 at 6/14/19 1:32 PM:


Wrote a document to arrange and better share my thoughts, and below is a brief 
summary:

# Conceptually it worth a second thought about introducing an optimized 
snapshot format for now (i.e. use checkpoint format in savepoint), just like 
it's not recommended to use snapshot for backup in database (although 
practically it could be implemented).
# Stop-with-checkpoint mechanism is like stopping database instance with a data 
flush, thus (IMHO) a different story from the checkpoint/savepoint (db 
snapshot/backup) diversity.
# In the long run we may improve the checkpoint to allow a short enough 
interval thus it may become some format of transactional log, then we could 
enable checkpoint-based savepoint (like transactional log based backup), so I 
agree to still call the new format in FLIP-41 a "Unified Format" although in 
the short term it only unifies savepoint.

Please check it and let me know your thoughts everyone. Thanks!


was (Author: carp84):
Wrote a document to arrange and better share my thoughts, and below is a brief 
summary:

1. Conceptually it worth a second thought about introducing an optimized 
snapshot format for now (i.e. use checkpoint format in savepoint), just like 
it's not recommended to use snapshot for backup in database (although 
practically it could be implemented).
2. Stop-with-checkpoint mechanism is like stopping database instance with a 
data flush, thus (IMHO) a different story from the checkpoint/savepoint (db 
snapshot/backup) diversity.
3. In the long run we may improve the checkpoint to allow a short enough 
interval thus it may become some format of transactional log, then we could 
enable checkpoint-based savepoint (like transactional log based backup), so I 
agree to still call the new format in FLIP-41 a "Unified Format" although in 
the short term it only unifies savepoint.

Please check it and let me know your thoughts everyone. Thanks!

> Support TERMINATE/SUSPEND Job with Checkpoint
> -
>
> Key: FLINK-12619
> URL: https://issues.apache.org/jira/browse/FLINK-12619
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Inspired by the idea of FLINK-11458, we propose to support terminate/suspend 
> a job with checkpoint. This improvement cooperates with incremental and 
> external checkpoint features, that if checkpoint is retained and this feature 
> is configured, we will trigger a checkpoint before the job stops. It could 
> accelarate job recovery a lot since:
> 1. No source rewinding required any more.
> 2. It's much faster than taking a savepoint since incremental checkpoint is 
> enabled.
> Please note that conceptually savepoints is different from checkpoint in a 
> similar way that backups are different from recovery logs in traditional 
> database systems. So we suggest using this feature only for job recovery, 
> while stick with FLINK-11458 for the 
> upgrading/cross-cluster-job-migration/state-backend-switch cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-14 Thread Yu Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864068#comment-16864068
 ] 

Yu Li commented on FLINK-12619:
---

Wrote a document to arrange and better share my thoughts, and below is a brief 
summary:
1. Conceptually it worth a second thought about introducing an optimized 
snapshot format for now (i.e. use checkpoint format in savepoint), just like 
it's not recommended to use snapshot for backup in database (although 
practically it could be implemented).
2. Stop-with-checkpoint mechanism is like stopping database instance with a 
data flush, thus (IMHO) a different story from the checkpoint/savepoint (db 
snapshot/backup) diversity.
3. In the long run we may improve the checkpoint to allow a short enough 
interval thus it may become some format of transactional log, then we could 
enable checkpoint-based savepoint (like transactional log based backup), so I 
agree to still call the new format in FLIP-41 a "Unified Format" although in 
the short term it only unifies savepoint.

Please check it and let me know your thoughts everyone. Thanks!

> Support TERMINATE/SUSPEND Job with Checkpoint
> -
>
> Key: FLINK-12619
> URL: https://issues.apache.org/jira/browse/FLINK-12619
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Inspired by the idea of FLINK-11458, we propose to support terminate/suspend 
> a job with checkpoint. This improvement cooperates with incremental and 
> external checkpoint features, that if checkpoint is retained and this feature 
> is configured, we will trigger a checkpoint before the job stops. It could 
> accelarate job recovery a lot since:
> 1. No source rewinding required any more.
> 2. It's much faster than taking a savepoint since incremental checkpoint is 
> enabled.
> Please note that conceptually savepoints is different from checkpoint in a 
> similar way that backups are different from recovery logs in traditional 
> database systems. So we suggest using this feature only for job recovery, 
> while stick with FLINK-11458 for the 
> upgrading/cross-cluster-job-migration/state-backend-switch cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-14 Thread Yu Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864068#comment-16864068
 ] 

Yu Li edited comment on FLINK-12619 at 6/14/19 1:31 PM:


Wrote a document to arrange and better share my thoughts, and below is a brief 
summary:

1. Conceptually it worth a second thought about introducing an optimized 
snapshot format for now (i.e. use checkpoint format in savepoint), just like 
it's not recommended to use snapshot for backup in database (although 
practically it could be implemented).
2. Stop-with-checkpoint mechanism is like stopping database instance with a 
data flush, thus (IMHO) a different story from the checkpoint/savepoint (db 
snapshot/backup) diversity.
3. In the long run we may improve the checkpoint to allow a short enough 
interval thus it may become some format of transactional log, then we could 
enable checkpoint-based savepoint (like transactional log based backup), so I 
agree to still call the new format in FLIP-41 a "Unified Format" although in 
the short term it only unifies savepoint.

Please check it and let me know your thoughts everyone. Thanks!


was (Author: carp84):
Wrote a document to arrange and better share my thoughts, and below is a brief 
summary:
1. Conceptually it worth a second thought about introducing an optimized 
snapshot format for now (i.e. use checkpoint format in savepoint), just like 
it's not recommended to use snapshot for backup in database (although 
practically it could be implemented).
2. Stop-with-checkpoint mechanism is like stopping database instance with a 
data flush, thus (IMHO) a different story from the checkpoint/savepoint (db 
snapshot/backup) diversity.
3. In the long run we may improve the checkpoint to allow a short enough 
interval thus it may become some format of transactional log, then we could 
enable checkpoint-based savepoint (like transactional log based backup), so I 
agree to still call the new format in FLIP-41 a "Unified Format" although in 
the short term it only unifies savepoint.

Please check it and let me know your thoughts everyone. Thanks!

> Support TERMINATE/SUSPEND Job with Checkpoint
> -
>
> Key: FLINK-12619
> URL: https://issues.apache.org/jira/browse/FLINK-12619
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Inspired by the idea of FLINK-11458, we propose to support terminate/suspend 
> a job with checkpoint. This improvement cooperates with incremental and 
> external checkpoint features, that if checkpoint is retained and this feature 
> is configured, we will trigger a checkpoint before the job stops. It could 
> accelarate job recovery a lot since:
> 1. No source rewinding required any more.
> 2. It's much faster than taking a savepoint since incremental checkpoint is 
> enabled.
> Please note that conceptually savepoints is different from checkpoint in a 
> similar way that backups are different from recovery logs in traditional 
> database systems. So we suggest using this feature only for job recovery, 
> while stick with FLINK-11458 for the 
> upgrading/cross-cluster-job-migration/state-backend-switch cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput

2019-06-14 Thread GitBox
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] 
Implement the runtime handling of BoundedOneInput and BoundedMultiInput
URL: https://github.com/apache/flink/pull/8731#discussion_r293805008
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 ##
 @@ -671,5 +737,100 @@ public InputSelection nextSelection() {
return InputSelection.ALL;
}
}
+
+   /**
+* Uses to test handling the EndOfInput notification.
+*/
+   private static class TestBoundedTwoInputOperator extends 
AbstractStreamOperator
+   implements TwoInputStreamOperator, 
BoundedMultiInput {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String name;
+
+   private volatile int outputRecords1;
+   private volatile int outputRecords2;
+
+   private static final CompletableFuture success = 
CompletableFuture.completedFuture(null);
+   private static final CompletableFuture failure = 
FutureUtils.completedExceptionally(
+   new Exception("Not in line with expectations."));
+
+   public TestBoundedTwoInputOperator(String name) {
+   this.name = name;
+   }
+
+   @Override
+   public void processElement1(StreamRecord element) {
+   output.collect(element.replace("[" + name + "-1]: " + 
element.getValue()));
+   outputRecords1++;
+
+   synchronized (this) {
+   this.notifyAll();
+   }
+   }
+
+   @Override
+   public void processElement2(StreamRecord element) {
+   output.collect(element.replace("[" + name + "-2]: " + 
element.getValue()));
+   outputRecords2++;
+
+   synchronized (this) {
+   this.notifyAll();
+   }
+   }
+
+   @Override
+   public void endInput(int inputId) {
+   output.collect(new StreamRecord<>("[" + name + "-" + 
inputId + "]: Bye"));
+
+   if (inputId == 1) {
+   outputRecords1++;
+   } else {
+   outputRecords2++;
+   }
+
+   synchronized (this) {
+   this.notifyAll();
+   }
+   }
+
+   public void waitOutputRecords(int inputId, int expectedRecords) 
throws Exception {
 
 Review comment:
   This ensures that the output is strictly ordered, to verify whether 
`endInput()` is called immediately when `input1` arrives the end.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12806) Remove beta feature remark from the Universal Kafka connector

2019-06-14 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-12806:
-
Priority: Major  (was: Blocker)

> Remove beta feature remark from the Universal Kafka connector
> -
>
> Key: FLINK-12806
> URL: https://issues.apache.org/jira/browse/FLINK-12806
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / Kafka
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> I think we can remove this remark from the docs as in the last half year 
> there were no issues reported that would say otherwise.
> The remark about universal connector being a beta feature was introduced in: 
> https://issues.apache.org/jira/browse/FLINK-10900



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12776) Ambiguous content in flink-dist NOTICE file

2019-06-14 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863997#comment-16863997
 ] 

sunjincheng edited comment on FLINK-12776 at 6/14/19 1:05 PM:
--

Thanks for the reply [~Zentol]!  I agree with you, we can remove the old 
`flink-python`, and share the good news here, then we do not need to do any 
change. :)  please see the DISCUSS thread: 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Deprecate-previous-Python-APIs-td29483.html]


was (Author: sunjincheng121):
Thanks for the reply [~Zentol]!  I want to share the good news here that we can 
remove the old `flink-pytyon`, then we do not need to do any change. :)  please 
see the DISCUSS thread: 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Deprecate-previous-Python-APIs-td29483.html]

> Ambiguous content in flink-dist NOTICE file
> ---
>
> Key: FLINK-12776
> URL: https://issues.apache.org/jira/browse/FLINK-12776
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Release System
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.9.0
>
> Attachments: image-2019-06-10-09-39-06-637.png
>
>
> With FLINK-12409 we include the new flink-python module in flink-dist. As a 
> result we now have 2 {{flink-python}} entries in the flink-dist NOTICE file, 
> one for the old batch API and one for the newly added one, which is 
> ambiguous. We should rectify this by either excluding the old batch API from 
> flink-dist, or rename the new module to something like {{flink-api-python}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12071) HadoopRecoverableWriterTest fails on Travis

2019-06-14 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864037#comment-16864037
 ] 

Till Rohrmann commented on FLINK-12071:
---

How would this harden the test case? Maybe you could try to run this single 
test in a loop on Travis to make it reproducible.

> HadoopRecoverableWriterTest fails on Travis
> ---
>
> Key: FLINK-12071
> URL: https://issues.apache.org/jira/browse/FLINK-12071
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Tests
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: leesf
>Priority: Critical
>  Labels: test-stability
>
> https://travis-ci.org/apache/flink/jobs/513373067
> {code}
> testExceptionWritingAfterCloseForCommit(org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriterTest)
>   Time elapsed: 0.031 s  <<< ERROR!
> java.lang.Exception: Unexpected exception, expected but 
> was
> Caused by: java.lang.IllegalArgumentException: Self-suppression not permitted
> Caused by: java.io.IOException: The stream is closed
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8743: [FLINK-12851][travis] Move gelly/kafka to separate profile

2019-06-14 Thread GitBox
flinkbot commented on issue #8743: [FLINK-12851][travis] Move gelly/kafka to 
separate profile 
URL: https://github.com/apache/flink/pull/8743#issuecomment-502097767
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput

2019-06-14 Thread GitBox
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] 
Implement the runtime handling of BoundedOneInput and BoundedMultiInput
URL: https://github.com/apache/flink/pull/8731#discussion_r293744349
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 ##
 @@ -597,6 +603,54 @@ public void testQuiesceTimerServiceAfterOpClose() throws 
Exception {
timeService.shutdownService();
}
 
+   @Test
+   public void testHandlingEndOfInput() throws Exception {
+   final OneInputStreamTaskTestHarness testHarness 
= new OneInputStreamTaskTestHarness<>(
+   OneInputStreamTask::new,
+   2, 2,
+   BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+   testHarness
+   .setupOperatorChain(new OperatorID(), new 
TestBoundedOneInputOperator("Operator0"))
+   .chain(
+   new OperatorID(),
+   new TestBoundedOneInputOperator("Operator1"),
+   
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
+   .finish();
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   testHarness.invoke();
+   testHarness.waitForTaskRunning();
+
+   TestBoundedOneInputOperator headOperator = 
(TestBoundedOneInputOperator) testHarness.getTask().headOperator;
+
+   testHarness.processElement(new StreamRecord<>("Hello-0-0"), 0, 
0);
 
 Review comment:
   This test was written a long time ago, when they were needed. Now 
`inputGate` and `channel` parameters are not needed, and I will delete them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput

2019-06-14 Thread GitBox
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] 
Implement the runtime handling of BoundedOneInput and BoundedMultiInput
URL: https://github.com/apache/flink/pull/8731#discussion_r293793699
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 ##
 @@ -1011,5 +1065,70 @@ protected void handleWatermark(Watermark mark) {
output.emitWatermark(mark);
}
}
+
+   /**
+* Uses to test handling the EndOfInput notification.
+*/
+   static class TestBoundedOneInputOperator extends 
AbstractStreamOperator
+   implements OneInputStreamOperator, 
BoundedOneInput {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String name;
+
+   private volatile int outputRecords;
+
+   private static final CompletableFuture success = 
CompletableFuture.completedFuture(null);
+   private static final CompletableFuture failure = 
FutureUtils.completedExceptionally(
+   new Exception("Not in line with expectation."));
+
+   public TestBoundedOneInputOperator(String name) {
+   this.name = name;
+   }
+
+   @Override
+   public void processElement(StreamRecord element) {
+   output.collect(element);
+   outputRecords++;
+
+   synchronized (this) {
+   this.notifyAll();
+   }
+   }
+
+   @Override
+   public void endInput() {
+   output.collect(new StreamRecord<>("[" + name + "]: 
Bye"));
+   outputRecords++;
+
+   synchronized (this) {
+   this.notifyAll();
+   }
+   }
+
+   public void waitOutputRecords(int expectedRecords) throws 
Exception {
 
 Review comment:
   This test was written a long time ago, when it was needed. Now not needed, 
and I will remove it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12851) Add travis profile for gelly/kafka

2019-06-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12851:
---
Labels: pull-request-available  (was: )

> Add travis profile for gelly/kafka
> --
>
> Key: FLINK-12851
> URL: https://issues.apache.org/jira/browse/FLINK-12851
> Project: Flink
>  Issue Type: Improvement
>  Components: Travis
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> The misc/tests profiles frequently hit timeouts; we can resolve this by 
> moving gelly and the universal kafka connector into a separate profile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol opened a new pull request #8743: [FLINK-12851][travis] Move gelly/kafka to separate profile

2019-06-14 Thread GitBox
zentol opened a new pull request #8743: [FLINK-12851][travis] Move gelly/kafka 
to separate profile 
URL: https://github.com/apache/flink/pull/8743
 
 
   Introduces another test profile for running gelly and kafka (universal) 
tests.
   
   Additionally, now that kafka is out of misc. the pre-commit tests are moved 
back into misc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] shaomengwang commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-14 Thread GitBox
shaomengwang commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293789454
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -44,17 +86,39 @@
 * @param   the type of the specific parameter
 * @return the value of the specific parameter, or default value 
defined in the {@code info} if
 * this Params doesn't contain the parameter
-* @throws RuntimeException if the Params doesn't contains the specific 
parameter, while the
-*  param is not optional but has no default 
value in the {@code info}
+* @throws IllegalArgumentException if the Params doesn't contains the 
specific parameter, while the
+*  param is not optional but has no default 
value in the {@code info} or
+*  if the Params contains the specific 
parameter and alias, but has more
+*  than one value or
+*  if the Params doesn't contains the specific 
parameter, while the ParamInfo
+*  is optional but has no default value
 */
-   @SuppressWarnings("unchecked")
public  V get(ParamInfo info) {
-   V value = (V) paramMap.getOrDefault(info.getName(), 
info.getDefaultValue());
-   if (value == null && !info.isOptional() && 
!info.hasDefaultValue()) {
-   throw new RuntimeException(info.getName() +
-   " not exist which is not optional and don't 
have a default value");
+   String value = null;
+   String usedParamName = null;
+   for (String nameOrAlias : getParamNameAndAlias(info)) {
+   String v = params.get(nameOrAlias);
+   if (value != null && v != null) {
+   throw new 
IllegalArgumentException(String.format("Duplicate parameters of %s and %s",
 
 Review comment:
   `params.put` can use `ParamInfo.getName` to get the key. It will not be able 
to generate duplicates and there is no information about paramName and alias at 
`putAll` from json. Probably `params.get` here is the proper place to check 
duplicates
   
   I agree with you it should be check duplicates in `ParamInfoFactory.setAlias`
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12541) Add deploy a Python Flink job and session cluster on Kubernetes support.

2019-06-14 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862841#comment-16862841
 ] 

Till Rohrmann edited comment on FLINK-12541 at 6/14/19 12:46 PM:
-

-Implemented via 00d90a4fe3ff93d19ff6be3ed7b9cf9d0f5b0dfd-


was (Author: till.rohrmann):
Implemented via 00d90a4fe3ff93d19ff6be3ed7b9cf9d0f5b0dfd

> Add deploy a Python Flink job and session cluster on Kubernetes support.
> 
>
> Key: FLINK-12541
> URL: https://issues.apache.org/jira/browse/FLINK-12541
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Runtime / REST
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Add deploy a Python Flink job and session cluster on Kubernetes support.
> We need to have the same deployment step as the Java job. Please see: 
> [https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] shaomengwang commented on a change in pull request #8632: [FLINK-12744][ml] add shared params in ml package

2019-06-14 Thread GitBox
shaomengwang commented on a change in pull request #8632: [FLINK-12744][ml] add 
shared params in ml package
URL: https://github.com/apache/flink/pull/8632#discussion_r293789454
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-api/src/main/java/org/apache/flink/ml/api/misc/param/Params.java
 ##
 @@ -44,17 +86,39 @@
 * @param   the type of the specific parameter
 * @return the value of the specific parameter, or default value 
defined in the {@code info} if
 * this Params doesn't contain the parameter
-* @throws RuntimeException if the Params doesn't contains the specific 
parameter, while the
-*  param is not optional but has no default 
value in the {@code info}
+* @throws IllegalArgumentException if the Params doesn't contains the 
specific parameter, while the
+*  param is not optional but has no default 
value in the {@code info} or
+*  if the Params contains the specific 
parameter and alias, but has more
+*  than one value or
+*  if the Params doesn't contains the specific 
parameter, while the ParamInfo
+*  is optional but has no default value
 */
-   @SuppressWarnings("unchecked")
public  V get(ParamInfo info) {
-   V value = (V) paramMap.getOrDefault(info.getName(), 
info.getDefaultValue());
-   if (value == null && !info.isOptional() && 
!info.hasDefaultValue()) {
-   throw new RuntimeException(info.getName() +
-   " not exist which is not optional and don't 
have a default value");
+   String value = null;
+   String usedParamName = null;
+   for (String nameOrAlias : getParamNameAndAlias(info)) {
+   String v = params.get(nameOrAlias);
+   if (value != null && v != null) {
+   throw new 
IllegalArgumentException(String.format("Duplicate parameters of %s and %s",
 
 Review comment:
   `params.put` can use `ParamInfo.getName` to get the key. It will not be able 
to generate duplicates. There is no information about paramName and alias at 
`putAll` from json. Probably `params.get` is the proper place to check 
duplicates
   
   I agree with you it should be check duplicates in `ParamInfoFactory.setAlias`
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann closed pull request #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots

2019-06-14 Thread GitBox
tillrohrmann closed pull request #7227: [FLINK-11059] [runtime] do not add 
releasing failed slot to free slots
URL: https://github.com/apache/flink/pull/7227
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-11059) JobMaster may continue using an invalid slot if releasing idle slot meet a timeout

2019-06-14 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-11059.
---
Resolution: Fixed

Fixed via
1.9.0: a4865cd67239c938e0075bcc2b112b944cc3c249
1.8.1: 5ef79de88726ddfccee30cb34a22711ca2b1116f
1.7.3: 65b6f99a69dbe65fc2dfaa97d0662f76909ef371

> JobMaster may continue using an invalid slot if releasing idle slot meet a 
> timeout
> --
>
> Key: FLINK-11059
> URL: https://issues.apache.org/jira/browse/FLINK-11059
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> When job master releases an idle slot to task executor, it may meet a timeout 
> exception which cause that task executor may have already released the slot, 
> but job master will add the slot back to available slots, and the slot may be 
> used again. Then job master will continue deploying task to the slot, but 
> task executor does not recognize it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on issue #8623: [FLINK-12719][python] Add the Python catalog API

2019-06-14 Thread GitBox
sunjincheng121 commented on issue #8623: [FLINK-12719][python] Add the Python 
catalog API
URL: https://github.com/apache/flink/pull/8623#issuecomment-502091680
 
 
   +1 to merged 💯 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL

2019-06-14 Thread GitBox
wuchong commented on a change in pull request #8718: 
[FLINK-12824][table-planner-blink] Set parallelism for stream SQL
URL: https://github.com/apache/flink/pull/8718#discussion_r293765451
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
 ##
 @@ -88,11 +89,17 @@ class StreamExecTableSourceScan(
 replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
   }
 
+  def getSourceTransformation(
+  streamEnv: StreamExecutionEnvironment): StreamTransformation[_] = {
+
tableSource.asInstanceOf[StreamTableSource[_]].getDataStream(streamEnv).getTransformation
 
 Review comment:
   We should cache the source transformation in this node to avoid calling 
`getDataSteram` multiple times which will lead to multiple source 
transformation in JobGraph.  And make sure to call this 
`getSourceTransformation` instead of `getDataStream` in this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL

2019-06-14 Thread GitBox
wuchong commented on a change in pull request #8718: 
[FLINK-12824][table-planner-blink] Set parallelism for stream SQL
URL: https://github.com/apache/flink/pull/8718#discussion_r293751698
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
 ##
 @@ -121,6 +122,19 @@ class StreamExecSink[T](
 "implemented and return the sink transformation DataStreamSink. " +
 s"However, ${sink.getClass.getCanonicalName} doesn't implement 
this method.")
 }
+val configSinkParallelism = NodeResourceConfig.getSinkParallelism(
+  tableEnv.getConfig.getConf)
+
+val maxSinkParallelism = dsSink.getTransformation.getMaxParallelism
+
+if (maxSinkParallelism > 0 && configSinkParallelism <= 
maxSinkParallelism) {
 
 Review comment:
   The same problem as `BatchExecSink`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL

2019-06-14 Thread GitBox
wuchong commented on a change in pull request #8718: 
[FLINK-12824][table-planner-blink] Set parallelism for stream SQL
URL: https://github.com/apache/flink/pull/8718#discussion_r293773998
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCalc.scala
 ##
 @@ -116,11 +116,16 @@ class StreamExecCalc(
   retainHeader = true,
   "StreamExecCalc"
 )
-new OneInputTransformation(
+val ret = new OneInputTransformation(
   inputTransform,
   RelExplainUtil.calcToString(calcProgram, getExpressionString),
   substituteStreamOperator,
   BaseRowTypeInfo.of(outputType),
-  inputTransform.getParallelism)
+  getResource.getParallelism)
+
+if (getResource.getMaxParallelism > 0) {
+  ret.setMaxParallelism(getResource.getMaxParallelism)
 
 Review comment:
   Do we need to set max parallelism for calc? For example, a non-parallel 
source following a calc, I think the calc can scale out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL

2019-06-14 Thread GitBox
wuchong commented on a change in pull request #8718: 
[FLINK-12824][table-planner-blink] Set parallelism for stream SQL
URL: https://github.com/apache/flink/pull/8718#discussion_r293752274
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
 ##
 @@ -121,6 +122,19 @@ class StreamExecSink[T](
 "implemented and return the sink transformation DataStreamSink. " +
 s"However, ${sink.getClass.getCanonicalName} doesn't implement 
this method.")
 }
+val configSinkParallelism = NodeResourceConfig.getSinkParallelism(
+  tableEnv.getConfig.getConf)
+
+val maxSinkParallelism = dsSink.getTransformation.getMaxParallelism
+
+if (maxSinkParallelism > 0 && configSinkParallelism <= 
maxSinkParallelism) {
+  dsSink.getTransformation.setParallelism(configSinkParallelism)
+}
+if (!UpdatingPlanChecker.isAppendOnly(this) &&
+dsSink.getTransformation.getParallelism != 
transformation.getParallelism) {
+  throw new TableException("Sink parallelism should be equal to input 
node when it is not" +
 
 Review comment:
   Improve the exception message:
   
   "The configured sink parallelism should be equal to the input node when 
input is an update stream. The input parallelism is ${...}, however the 
configured sink parallelism is ${...}"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL

2019-06-14 Thread GitBox
wuchong commented on a change in pull request #8718: 
[FLINK-12824][table-planner-blink] Set parallelism for stream SQL
URL: https://github.com/apache/flink/pull/8718#discussion_r293786650
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/nodes/resource/parallelism/FinalParallelismSetter.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.resource.parallelism;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.table.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import 
org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan;
+import 
org.apache.flink.table.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import 
org.apache.flink.table.plan.nodes.physical.stream.StreamExecTableSourceScan;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Values;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Set final parallelism if needed at the beginning time, if parallelism of a 
node is set to be final,
+ * it will not be changed by other parallelism calculator.
+ */
+public class FinalParallelismSetter {
+
+   private final StreamExecutionEnvironment env;
+   private Set> calculatedNodeSet = new HashSet<>();
+   private Map, Integer> finalParallelismNodeMap = new 
HashMap<>();
+
+   private FinalParallelismSetter(StreamExecutionEnvironment env) {
+   this.env = env;
+   }
+
+   /**
+* Finding nodes that need to set final parallelism.
+*/
+   public static Map, Integer> 
calculate(StreamExecutionEnvironment env, List> sinkNodes) {
+   FinalParallelismSetter setter = new FinalParallelismSetter(env);
+   sinkNodes.forEach(setter::calculate);
+   return setter.finalParallelismNodeMap;
+   }
+
+   private void calculate(ExecNode execNode) {
+   if (!calculatedNodeSet.add(execNode)) {
+   return;
+   }
+   if (execNode instanceof BatchExecTableSourceScan) {
+   calculateTableSource((BatchExecTableSourceScan) 
execNode);
+   } else if (execNode instanceof StreamExecTableSourceScan) {
+   calculateTableSource((StreamExecTableSourceScan) 
execNode);
+   } else if (execNode instanceof BatchExecBoundedStreamScan) {
+   calculateBoundedStreamScan((BatchExecBoundedStreamScan) 
execNode);
+   } else if (execNode instanceof StreamExecDataStreamScan) {
+   calculateDataStreamScan((StreamExecDataStreamScan) 
execNode);
+   } else if (execNode instanceof Values) {
+   calculateValues(execNode);
+   } else {
+   calculateIfSingleton(execNode);
+   }
+   }
+
+   private void calculateTableSource(BatchExecTableSourceScan 
tableSourceScan) {
+   StreamTransformation transformation = 
tableSourceScan.getSourceTransformation(env);
+   if (transformation.getMaxParallelism() > 0) {
+   
tableSourceScan.getResource().setMaxParallelism(transformation.getMaxParallelism());
 
 Review comment:
   It seems that the max parallelism of source transformation is not used to 
avoid set an invalid parallelism like what we do for sink?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8718: [FLINK-12824][table-planner-blink] Set parallelism for stream SQL

2019-06-14 Thread GitBox
wuchong commented on a change in pull request #8718: 
[FLINK-12824][table-planner-blink] Set parallelism for stream SQL
URL: https://github.com/apache/flink/pull/8718#discussion_r293750904
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
 ##
 @@ -92,17 +93,15 @@ class BatchExecSink[T](
 }
 val sinkTransformation = dsSink.getTransformation
 
-if (sinkTransformation.getMaxParallelism > 0) {
-  
sinkTransformation.setParallelism(sinkTransformation.getMaxParallelism)
-} else {
-  val configSinkParallelism = NodeResourceConfig.getSinkParallelism(
-tableEnv.getConfig.getConf)
-  if (configSinkParallelism > 0) {
-sinkTransformation.setParallelism(configSinkParallelism)
-  } else if (boundedStream.getParallelism > 0) {
-sinkTransformation.setParallelism(boundedStream.getParallelism)
-  }
+val configSinkParallelism = NodeResourceConfig.getSinkParallelism(
+  tableEnv.getConfig.getConf)
+
+val maxSinkParallelism = sinkTransformation.getMaxParallelism
+
+if (maxSinkParallelism > 0 && configSinkParallelism <= 
maxSinkParallelism) {
 
 Review comment:
   The sink parallelism only works when max parallelism is set? And If the 
`configSinkParallelism` is -1, the sink parallelism will be set to -1 ? 
   
   Maybe the logic should be changed to this:
   
   ```java
   // only set user's parallelism when user defines a sink parallelism
   if (configSinkParallelism > 0) {
 // set the parallelism when user's parallelism is not larger than max 
parallelism or max parallelism is not set
 if (maxSinkParallelism < 0 || configSinkParallelism <= maxSinkParallelism) 
{
   sinkTransformation.setParallelism(configSinkParallelism)
 }
   }
   ```
   
   And we should add cases to cover this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangXingBo commented on issue #8732: [FLINK-12720][python] Add the Python Table API Sphinx docs

2019-06-14 Thread GitBox
HuangXingBo commented on issue #8732: [FLINK-12720][python] Add the Python 
Table API Sphinx docs
URL: https://github.com/apache/flink/pull/8732#issuecomment-502090131
 
 
   Thanks a lot for @dianfu @sunjincheng121.
   1. I add logic about sphinx installed and check in lint-python.sh.
   2. I add a generate-docs.sh responsible for call the script lint-python.sh 
to generate python api docs and copy the api docs to the specified target path.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput

2019-06-14 Thread GitBox
sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] 
Implement the runtime handling of BoundedOneInput and BoundedMultiInput
URL: https://github.com/apache/flink/pull/8731#discussion_r293741881
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ##
 @@ -225,6 +226,21 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws Exception {
}
}
 
+   /**
+* Starting from the second operator, go forward through the operator 
chain to notify
+* each operator that its input has ended.
+*
+* @throws Exception if some exception happens in the endInput function 
of an operator.
+*/
+   public void endOperatorInputs() throws Exception {
+   for (int i = allOperators.length - 2; i >= 0; i--) {
 
 Review comment:
   > I think you could simplify the code if you iterated through all operators 
in OperatorChain (not starting from the second one), check each one of them if 
there are instanceof BoundedOneInput and end them if they are.
   >
   >This would work for:
   >
   
   > - OneInputStreamTask - because all of them are/can be BoundedOneInput
   > - SourceStreamTask - because head will not be BoundedOneInput
   > - TwoInputStreamTask - because head will not be BoundedOneInput
   
   You are right. However, if chaining non-header two input operator is allowed 
in the future, it will have problems. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12776) Ambiguous content in flink-dist NOTICE file

2019-06-14 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863997#comment-16863997
 ] 

sunjincheng commented on FLINK-12776:
-

Thanks for the reply [~Zentol]!  I want to share the good news here that we can 
remove the old `flink-pytyon`, then we do not need to do any change. :)  please 
see the DISCUSS thread: 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Deprecate-previous-Python-APIs-td29483.html]

> Ambiguous content in flink-dist NOTICE file
> ---
>
> Key: FLINK-12776
> URL: https://issues.apache.org/jira/browse/FLINK-12776
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Release System
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.9.0
>
> Attachments: image-2019-06-10-09-39-06-637.png
>
>
> With FLINK-12409 we include the new flink-python module in flink-dist. As a 
> result we now have 2 {{flink-python}} entries in the flink-dist NOTICE file, 
> one for the old batch API and one for the newly added one, which is 
> ambiguous. We should rectify this by either excluding the old batch API from 
> flink-dist, or rename the new module to something like {{flink-api-python}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] aljoscha commented on a change in pull request #8642: [FLINK-1722][datastream] Enable the InitializeOnMaster and FinalizeOnMaster interfaces on datastream

2019-06-14 Thread GitBox
aljoscha commented on a change in pull request #8642: [FLINK-1722][datastream] 
Enable the InitializeOnMaster and FinalizeOnMaster interfaces on datastream
URL: https://github.com/apache/flink/pull/8642#discussion_r293771876
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
 ##
 @@ -47,6 +48,10 @@
} else if (operator instanceof StreamSource &&
((StreamSource) operator).getUserFunction() 
instanceof InputFormatSourceFunction) {
return new 
SimpleInputFormatOperatorFactory((StreamSource) operator);
+   } else if (operator instanceof StreamSink &&
+   ((StreamSink) operator).getUserFunction() instanceof 
OutputFormatSinkFunction) {
+   //noinspection unchecked
 
 Review comment:
   I think `noinspection` is only for IntelliJ, for properly suppressing 
warnings here we have to use `@SuppressWarnings("unchecked")`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >