[jira] [Commented] (FLINK-12380) Add thread name in the log4j.properties

2019-05-10 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837766#comment-16837766
 ] 

Yun Tang commented on FLINK-12380:
--

[~azagrebin] If so why current 
[logbak.xml|https://github.com/apache/flink/blob/a7cf24383be9f310fb5ccc5a032721421fa45791/flink-dist/src/main/flink-bin/conf/logback.xml#L24]
 would already contain the thread name? 

I just replied Stephan in the closed PR with above confusion.



> Add thread name in the log4j.properties
> ---
>
> Key: FLINK-12380
> URL: https://issues.apache.org/jira/browse/FLINK-12380
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This is inspired by FLINK-12368 when users want to add sub-task index 
> information in the source code. We could add thread name, which already 
> contains sub-task index information, in the logs to avoid have to change the 
> source code.
> Moreover, I found existing {{logback.xml}} in Flink already contains {{thread 
> name}} information. We should also add this in the {{log4j.properties.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Myasuka commented on issue #8328: [FLINK-12380] Add thread name in the log4j.properties

2019-05-10 Thread GitBox
Myasuka commented on issue #8328: [FLINK-12380] Add thread name in the 
log4j.properties
URL: https://github.com/apache/flink/pull/8328#issuecomment-491482568
 
 
   @StephanEwen As you are the founder of Flink and the most-contributed coder, 
I believe and respect your choice.
   However, your reply actually did not convince me. I struggled to hesitate 
whether to voice my thoughts. Since Apache Flink is an open source software, I 
believe every word counts.
   
   1. >  We kept it stable for a while now, which seems to have been generally 
appreciated by users.
   
   I am not sure about other users but we might have the largest Flink cluster 
in the world but add thread name explicitly in `log4j.properties`.
   
   2. Actually 
[logback.xml](https://github.com/apache/flink/blob/a7cf24383be9f310fb5ccc5a032721421fa45791/flink-dist/src/main/flink-bin/conf/logback.xml#L24)
 already contains the thread name.
   
   Last but not least, if your choice is based on "Even though `log4j` and 
`logback` have different log pattern, but the log system for Flink works well 
so far, we should not touch this without any strong demand." I think this would 
be acceptable for me, and hope for no more demands come out to add sub-task 
index information in logs.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuQianJin-Stars commented on issue #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime

2019-05-10 Thread GitBox
XuQianJin-Stars commented on issue #8244: [FLINK-11945] [table-runtime-blink] 
Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#issuecomment-491482291
 
 
   hi @wuchong Thank you very much,Comments addressed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on issue #8317: [FLINK-12371] [table-planner-blink] Add support for converting (NOT) IN/ (NOT) EXISTS to semi-join/anti-join, and generating optimized logical plan

2019-05-10 Thread GitBox
godfreyhe commented on issue #8317: [FLINK-12371] [table-planner-blink] Add 
support for converting (NOT) IN/ (NOT) EXISTS to semi-join/anti-join, and 
generating optimized logical plan
URL: https://github.com/apache/flink/pull/8317#issuecomment-491481206
 
 
   @KurtYoung I have fixed the failed test cases, the current failing is in 
`flink-yarn-tests` module.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283083182
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283082959
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+/**
+ * Strategy test utilities.
+ */
+public class StrategyTestUtil {
+
+   static void initVerticesAndPartitions(
+   TestingSchedulingTopology schedulingTopology,
+   ResultPartitionType[] partitionTypes,
+   InputDependencyConstraint[] inputDependencyConstraints,
+   TestingSchedulingExecutionVertex[][] vertices,
+   TestingSchedulingResultPartition[][] partitions) {
+
+   ResultPartitionID[][] resultPartitionIds = new 
ResultPartitionID[vertices.length - 1][vertices[0].length];
+   initVerticesAndPartitions(schedulingTopology, partitionTypes, 
inputDependencyConstraints,
+   vertices, resultPartitionIds, partitions);
+   }
+
+   static void initVerticesAndPartitions(
+   TestingSchedulingTopology schedulingTopology,
+   ResultPartitionType[] partitionTypes,
+   InputDependencyConstraint[] inputDependencyConstraints,
+   TestingSchedulingExecutionVertex[][] vertices,
+   ResultPartitionID[][] resultPartitionIds,
+   TestingSchedulingResultPartition[][] partitions) {
+
+   final int jobVertexCnt = vertices.length;
+   final int taskCnt = vertices[0].length;
+
+   JobVertexID[] jobVertexIds = new JobVertexID[jobVertexCnt];
+   IntermediateDataSetID[] dataSetIds = new 
IntermediateDataSetID[jobVertexCnt];
+   for (int i = 0; i < jobVertexCnt; i++) {
+   jobVertexIds[i] = new JobVertexID();
+   dataSetIds[i] = new IntermediateDataSetID();
+   
schedulingTopology.addInputDependencyConstraint(jobVertexIds[i], 
inputDependencyConstraints[i]);
+   }
+
+   for (int i = 0; i < jobVertexCnt; i++) {
+   for (int j = 0; j < taskCnt; j++) {
+   vertices[i][j] = new 
TestingSchedulingExecutionVertex(jobVertexIds[i], j);
+   
schedulingTopology.addSchedulingExecutionVertex(vertices[i][j]);
+
+   if (i != jobVertexCnt - 1) {
+   partitions[i][j] = new 
TestingSchedulingResultPartition(dataSetIds[i], new 
IntermediateResultPartitionID(),
+   partitionTypes[i], 
vertices[i][j]);
+   resultPartitionIds[i][j] = new 
ResultPartitionID(partitions[i][j].getId(), new ExecutionAttemptID());
+   
schedulingTopology.addResultPartition(partitions[i][j].getId(), 
partitions[i][j]);
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283082955
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.connectVerticesToPartition;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.initVerticesAndPartitions;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   final int jobVertexCnt = 2;
+   final int taskCnt = 3;
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+   TestingSchedulingExecutionVertex[][] vertices = new 
TestingSchedulingExecutionVertex[jobVertexCnt][taskCnt];
+   TestingSchedulingResultPartition[][] partitions = new 
TestingSchedulingResultPartition[jobVertexCnt - 1][taskCnt];
+
+   ResultPartitionType[] partitionTypes = new 
ResultPartitionType[jobVertexCnt - 1];
+   InputDependencyConstraint[] inputDependencyConstraints = new 
InputDependencyConstraint[jobVertexCnt];
+   partitionTypes[0] = BLOCKING;
+   inputDependencyConstraints[0] = ANY;
+   inputDependencyConstraints[1] = ANY;
+
+   initVerticesAndPartitions(schedulingTopology, partitionTypes, 
inputDependencyConstraints, vertices, partitions);
+
+   for (int i = 0; i < taskCnt; i++) {
+   for (int j = 0; j < taskCnt; j++) {
+   connectVerticesToPartition(vertices[0][i], 
vertices[1][i], partitions[0][j]);
+   }
+   }
+
+   TestingSchedulerOperations testingSchedulerOperation = new 
TestingSchedulerOperations();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology);
+
+   schedulingStrategy.startScheduling();
+
+   Set toBeScheduledVertices = 
Arrays.stream(vertices[0])
+   
.map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
+   Collection scheduledVertices = 
testingSchedulerOperation.getScheduledVertices().get(0);
+
+   
assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices),
+   containsInAnyOrder(toBeScheduledVertices.toArray()));
+   }
+
+   /**
+* Tests that when 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283082635
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283082502
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283082509
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID 

[jira] [Commented] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints

2019-05-10 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837692#comment-16837692
 ] 

Stephan Ewen commented on FLINK-8513:
-

Fixed in {{master}} in 4dae94d952958858af722c6285398bc6402708e4

> Add documentation for connecting to non-AWS S3 endpoints
> 
>
> Key: FLINK-8513
> URL: https://issues.apache.org/jira/browse/FLINK-8513
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Documentation
>Reporter: chris snow
>Assignee: Seth Wiesman
>Priority: Trivial
>
> It would be useful if the documentation provided information on connecting to 
> non-AWS S3 endpoints when using presto.  For example:
>  
> 
> You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
> {{flink-conf.yaml}}:
> {code:java}
> s3.access-key: your-access-key 
> s3.secret-key: your-secret-key{code}
> If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
> Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
> endpoint in Flink's {{flink-conf.yaml}}:
> {code:java}
> s3.endpoint: your-endpoint-hostname{code}
> 
>  
> Source: 
> [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10249) Document hadoop/presto s3 file system configuration forwarding

2019-05-10 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837691#comment-16837691
 ] 

Stephan Ewen commented on FLINK-10249:
--

Fixed in {{master}} in 7d45b756b8bccbbacf6b7b9ca50a7ede8ed01a5c

> Document hadoop/presto s3 file system configuration forwarding
> --
>
> Key: FLINK-10249
> URL: https://issues.apache.org/jira/browse/FLINK-10249
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Documentation
>Reporter: Andrey Zagrebin
>Assignee: Seth Wiesman
>Priority: Minor
>
> Flink hadoop and presto s3 file system factories (S3FileSystemFactory) use 
> HadoopConfigLoader which automatically converts and prefixes s3.* config 
> options to configure underlying s3 clients. We can leave at least a hint 
> about this behaviour for users who want to change config of these underlying 
> s3 clients, e.g. in docs/ops/deployment/aws.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12388) Update the production readiness checklist

2019-05-10 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837690#comment-16837690
 ] 

Stephan Ewen commented on FLINK-12388:
--

Fixed in {{master}} in 754cd71dd92dfcfff3e1ef23083790422188ce9e

> Update the production readiness checklist 
> --
>
> Key: FLINK-12388
> URL: https://issues.apache.org/jira/browse/FLINK-12388
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The production readiness checklist has grown organically over the years, and 
> while it provides valuable information, the content does not flow cohesively 
> as it has been worked on by a number of users. 
> We should improve the overall structure and readability of the checklist and 
> also update any outdated information. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-05-10 Thread Stephan Ewen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-12070.


> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-04-18-17-38-24-949.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12378) Consolidate FileSystem Documentation

2019-05-10 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837694#comment-16837694
 ] 

Stephan Ewen commented on FLINK-12378:
--

Fixed in {{master}} in b50896a55288b363fd369368c0823abbe99aa36c

> Consolidate FileSystem Documentation
> 
>
> Key: FLINK-12378
> URL: https://issues.apache.org/jira/browse/FLINK-12378
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, FileSystems
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently flink's filesystem documentation is spread across a number of pages 
> without any clear connection. A non-exhaustive list of issues includes: 
> * S3 documentation spread across many pages
> * OSS filesystem is listed under deployments when it is an object store
> * deployments/filesystem.md has a lot of unrelated information
> We should create a filesystem subsection under deployments with multiple 
> pages containing all relevant information about Flink's filesystem 
> abstraction. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12378) Consolidate FileSystem Documentation

2019-05-10 Thread Stephan Ewen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-12378:
-
Fix Version/s: 1.8.1
   1.9.0

> Consolidate FileSystem Documentation
> 
>
> Key: FLINK-12378
> URL: https://issues.apache.org/jira/browse/FLINK-12378
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, FileSystems
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently flink's filesystem documentation is spread across a number of pages 
> without any clear connection. A non-exhaustive list of issues includes: 
> * S3 documentation spread across many pages
> * OSS filesystem is listed under deployments when it is an object store
> * deployments/filesystem.md has a lot of unrelated information
> We should create a filesystem subsection under deployments with multiple 
> pages containing all relevant information about Flink's filesystem 
> abstraction. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints

2019-05-10 Thread Stephan Ewen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-8513:

Fix Version/s: 1.8.1
   1.9.0

> Add documentation for connecting to non-AWS S3 endpoints
> 
>
> Key: FLINK-8513
> URL: https://issues.apache.org/jira/browse/FLINK-8513
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Documentation
>Reporter: chris snow
>Assignee: Seth Wiesman
>Priority: Trivial
> Fix For: 1.9.0, 1.8.1
>
>
> It would be useful if the documentation provided information on connecting to 
> non-AWS S3 endpoints when using presto.  For example:
>  
> 
> You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's 
> {{flink-conf.yaml}}:
> {code:java}
> s3.access-key: your-access-key 
> s3.secret-key: your-secret-key{code}
> If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object 
> Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 
> endpoint in Flink's {{flink-conf.yaml}}:
> {code:java}
> s3.endpoint: your-endpoint-hostname{code}
> 
>  
> Source: 
> [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-12070) Make blocking result partitions consumable multiple times

2019-05-10 Thread Stephan Ewen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-12070.
--
   Resolution: Implemented
 Assignee: Stephan Ewen  (was: BoWang)
Fix Version/s: 1.9.0

Implemented in a7cf24383be9f310fb5ccc5a032721421fa45791

> Make blocking result partitions consumable multiple times
> -
>
> Key: FLINK-12070
> URL: https://issues.apache.org/jira/browse/FLINK-12070
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-04-18-17-38-24-949.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In order to avoid writing produced results multiple times for multiple 
> consumers and in order to speed up batch recoveries, we should make the 
> blocking result partitions to be consumable multiple times. At the moment a 
> blocking result partition will be released once the consumers has processed 
> all data. Instead the result partition should be released once the next 
> blocking result has been produced and all consumers of a blocking result 
> partition have terminated. Moreover, blocking results should not hold on slot 
> resources like network buffers or memory as it is currently the case with 
> {{SpillableSubpartitions}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12378) Consolidate FileSystem Documentation

2019-05-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12378:
---
Labels: pull-request-available  (was: )

> Consolidate FileSystem Documentation
> 
>
> Key: FLINK-12378
> URL: https://issues.apache.org/jira/browse/FLINK-12378
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, FileSystems
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> Currently flink's filesystem documentation is spread across a number of pages 
> without any clear connection. A non-exhaustive list of issues includes: 
> * S3 documentation spread across many pages
> * OSS filesystem is listed under deployments when it is an object store
> * deployments/filesystem.md has a lot of unrelated information
> We should create a filesystem subsection under deployments with multiple 
> pages containing all relevant information about Flink's filesystem 
> abstraction. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8326: [FLINK-12378][docs] Consolidate FileSystem Documentation

2019-05-10 Thread GitBox
asfgit closed pull request #8326: [FLINK-12378][docs] Consolidate FileSystem 
Documentation
URL: https://github.com/apache/flink/pull/8326
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-10 Thread GitBox
StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new 
bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#issuecomment-491457613
 
 
   Manually merged in a7cf24383be9f310fb5ccc5a032721421fa45791


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8330: [FLINK-12388][docs] Update the production readiness checklist

2019-05-10 Thread GitBox
asfgit closed pull request #8330: [FLINK-12388][docs] Update the production 
readiness checklist
URL: https://github.com/apache/flink/pull/8330
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen closed pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-10 Thread GitBox
StephanEwen closed pull request #8290: [FLINK-12070] [network] Implement new 
bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283063574
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager that encapsulates all available catalogs. It also 
implements the logic of
+ * table path resolution. Supports both new API ({@link ReadableCatalog} as 
well as {@link ExternalCatalog}.
+ */
+@Internal
+public class CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // TO BE REMOVED along with ExternalCatalog API
+   private Map  externalCatalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   private String currentDatabaseName;
+
+   public CatalogManager(String defaultCatalogName, Catalog 
defaultCatalog) {
+   catalogs = new LinkedHashMap<>();
+   externalCatalogs = new LinkedHashMap<>();
+   catalogs.put(defaultCatalogName, defaultCatalog);
+   this.currentCatalogName = defaultCatalogName;
+   this.currentDatabaseName = defaultCatalog.getCurrentDatabase();
+   }
+
+   /**
+* Registers a catalog under the given name. The catalog name must be 
unique across both
+* {@link Catalog}s and {@link ExternalCatalog}s.
+*
+* @param catalogName name under which to register the given catalog
+* @param catalog catalog to register
+* @throws CatalogAlreadyExistsException thrown if the name is already 
taken
+*/
+   public void registerCatalog(String catalogName, Catalog catalog) throws 
CatalogAlreadyExistsException {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"Catalog name cannot be null or empty.");
+   checkNotNull(catalog, "Catalog cannot be null");
+
+   if (catalogs.containsKey(catalogName) || 
externalCatalogs.containsKey(catalogName)) {
+   throw new CatalogAlreadyExistsException(catalogName);
+   }
+
+   catalogs.put(catalogName, catalog);
+   catalog.open();
+   }
+
+   /**
+* Gets a catalog by name.
+*
+* @param catalogName name of the catalog to retrieve
+* @return the requested catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+* @see CatalogManager#getExternalCatalog(String)
+*/
+   public Catalog getCatalog(String catalogName) throws 
CatalogNotExistException {
+   if (!catalogs.keySet().contains(catalogName)) {
+   throw new CatalogNotExistException(catalogName);
+   }
+
+   return catalogs.get(catalogName);
+   }
+
+   /**
+* Registers an external catalog under the given 

[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283062024
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager that encapsulates all available catalogs. It also 
implements the logic of
+ * table path resolution. Supports both new API ({@link ReadableCatalog} as 
well as {@link ExternalCatalog}.
 
 Review comment:
   "Catalog", not "ReadableCatalog"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283060689
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -51,7 +51,7 @@
 
public static final String DEFAULT_DB = "default";
 
-   private String currentDatabase = DEFAULT_DB;
+   private String currentDatabase;
 
 Review comment:
   can you rebase the in-memory catalog to 
https://github.com/apache/flink/pull/8390 ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283062301
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager that encapsulates all available catalogs. It also 
implements the logic of
+ * table path resolution. Supports both new API ({@link ReadableCatalog} as 
well as {@link ExternalCatalog}.
+ */
+@Internal
+public class CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // TO BE REMOVED along with ExternalCatalog API
+   private Map  externalCatalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   private String currentDatabaseName;
+
+   public CatalogManager(String defaultCatalogName, Catalog 
defaultCatalog) {
 
 Review comment:
   validate defaultCatalogName and defaultCatalog?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283062652
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager that encapsulates all available catalogs. It also 
implements the logic of
+ * table path resolution. Supports both new API ({@link ReadableCatalog} as 
well as {@link ExternalCatalog}.
+ */
+@Internal
+public class CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // TO BE REMOVED along with ExternalCatalog API
+   private Map  externalCatalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   private String currentDatabaseName;
+
+   public CatalogManager(String defaultCatalogName, Catalog 
defaultCatalog) {
+   catalogs = new LinkedHashMap<>();
+   externalCatalogs = new LinkedHashMap<>();
+   catalogs.put(defaultCatalogName, defaultCatalog);
+   this.currentCatalogName = defaultCatalogName;
+   this.currentDatabaseName = defaultCatalog.getCurrentDatabase();
+   }
+
+   /**
+* Registers a catalog under the given name. The catalog name must be 
unique across both
+* {@link Catalog}s and {@link ExternalCatalog}s.
+*
+* @param catalogName name under which to register the given catalog
+* @param catalog catalog to register
+* @throws CatalogAlreadyExistsException thrown if the name is already 
taken
+*/
+   public void registerCatalog(String catalogName, Catalog catalog) throws 
CatalogAlreadyExistsException {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"Catalog name cannot be null or empty.");
+   checkNotNull(catalog, "Catalog cannot be null");
+
+   if (catalogs.containsKey(catalogName) || 
externalCatalogs.containsKey(catalogName)) {
+   throw new CatalogAlreadyExistsException(catalogName);
+   }
+
+   catalogs.put(catalogName, catalog);
+   catalog.open();
+   }
+
+   /**
+* Gets a catalog by name.
+*
+* @param catalogName name of the catalog to retrieve
+* @return the requested catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+* @see CatalogManager#getExternalCatalog(String)
+*/
+   public Catalog getCatalog(String catalogName) throws 
CatalogNotExistException {
+   if (!catalogs.keySet().contains(catalogName)) {
+   throw new CatalogNotExistException(catalogName);
+   }
+
+   return catalogs.get(catalogName);
+   }
+
+   /**
+* Registers an external catalog under the given 

[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283059466
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -55,19 +57,46 @@
 * Registers an {@link ExternalCatalog} under a unique name in the 
TableEnvironment's schema.
 * All tables registered in the {@link ExternalCatalog} can be accessed.
 *
-* @param nameThe name under which the externalCatalog will 
be registered
+* @param name The name under which the externalCatalog will be 
registered
 * @param externalCatalog The externalCatalog to register
+* @see TableEnvironment#getCatalog(String)
+* @see TableEnvironment#registerCatalog(String, Catalog)
+* @deprecated the {@link ExternalCatalog} API is deprecated. Use the 
corresponding {@link Catalog} API.
 */
+   @Deprecated
void registerExternalCatalog(String name, ExternalCatalog 
externalCatalog);
 
/**
 * Gets a registered {@link ExternalCatalog} by name.
 *
 * @param name The name to look up the {@link ExternalCatalog}
 * @return The {@link ExternalCatalog}
+* @see TableEnvironment#getCatalog(String)
+* @see TableEnvironment#registerCatalog(String, Catalog)
+* @deprecated the {@link ExternalCatalog} API is deprecated. Use the 
corresponding {@link Catalog} API.
 */
+   @Deprecated
ExternalCatalog getRegisteredExternalCatalog(String name);
 
+   /**
+* Registers a {@link Catalog} under a unique name.
+* All tables registered in the {@link Catalog} can be accessed.
+*
+* @param name the name under which the catalog will be registered
+* @param catalog the catalog to register
+* @throws CatalogAlreadyExistsException thrown if catalog with given 
name already exists
+*/
+   void registerCatalog(String name, Catalog catalog) throws 
CatalogAlreadyExistsException;
+
+   /**
+* Gets a registered {@link Catalog} by name.
+*
+* @param catalogName The name to look up the {@link Catalog}
 
 Review comment:
   nit: "**the** name ..."


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283063171
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager that encapsulates all available catalogs. It also 
implements the logic of
+ * table path resolution. Supports both new API ({@link ReadableCatalog} as 
well as {@link ExternalCatalog}.
+ */
+@Internal
+public class CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // TO BE REMOVED along with ExternalCatalog API
+   private Map  externalCatalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   private String currentDatabaseName;
+
+   public CatalogManager(String defaultCatalogName, Catalog 
defaultCatalog) {
+   catalogs = new LinkedHashMap<>();
+   externalCatalogs = new LinkedHashMap<>();
+   catalogs.put(defaultCatalogName, defaultCatalog);
+   this.currentCatalogName = defaultCatalogName;
+   this.currentDatabaseName = defaultCatalog.getCurrentDatabase();
+   }
+
+   /**
+* Registers a catalog under the given name. The catalog name must be 
unique across both
+* {@link Catalog}s and {@link ExternalCatalog}s.
+*
+* @param catalogName name under which to register the given catalog
+* @param catalog catalog to register
+* @throws CatalogAlreadyExistsException thrown if the name is already 
taken
+*/
+   public void registerCatalog(String catalogName, Catalog catalog) throws 
CatalogAlreadyExistsException {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"Catalog name cannot be null or empty.");
+   checkNotNull(catalog, "Catalog cannot be null");
+
+   if (catalogs.containsKey(catalogName) || 
externalCatalogs.containsKey(catalogName)) {
+   throw new CatalogAlreadyExistsException(catalogName);
+   }
+
+   catalogs.put(catalogName, catalog);
+   catalog.open();
+   }
+
+   /**
+* Gets a catalog by name.
+*
+* @param catalogName name of the catalog to retrieve
+* @return the requested catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+* @see CatalogManager#getExternalCatalog(String)
+*/
+   public Catalog getCatalog(String catalogName) throws 
CatalogNotExistException {
+   if (!catalogs.keySet().contains(catalogName)) {
+   throw new CatalogNotExistException(catalogName);
+   }
+
+   return catalogs.get(catalogName);
+   }
+
+   /**
+* Registers an external catalog under the given 

[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283061626
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink's catalog and Calcite's schema. This enables to 
look up and access tables
 
 Review comment:
   "...access tables **and views** in SQL queries without registering **them** 
in advance"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8390: [FLINK-12469][table] Clean up catalog API on default/current database

2019-05-10 Thread GitBox
bowenli86 commented on issue #8390: [FLINK-12469][table] Clean up catalog API 
on default/current database
URL: https://github.com/apache/flink/pull/8390#issuecomment-491445937
 
 
   +1
   
   @dawidwys  can you take another look? if you don't have other comments, I'll 
merge this once CI is green


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-10 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r283056348
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   protected abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
 
 Review comment:
   Make sense. Though IllegalArgumentException can be thrown when any part of 
the input table is invalid, a better signature may just be throwing 
CatalogException to conform with that of other existing catalog APIs, since 
we've defined CatalogException's use case as "in case of any runtime exception".
   
   I don't think returning boolean is a good signature as the check can be 
fairly complicated and the caller won't know which part is invalid. Returning 
boolean would be a good option for a single criteria check like "bool 
isTable()".


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8390: [FLINK-12469][table] Clean 
up catalog API on default/current database
URL: https://github.com/apache/flink/pull/8390#discussion_r283047530
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -48,23 +48,27 @@
  */
 public abstract class HiveCatalogBase implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalogBase.class);
-
-   public static final String DEFAULT_DB = "default";
+   private static final String DEFAULT_DB = "default";
 
protected final String catalogName;
protected final HiveConf hiveConf;
 
-   protected String currentDatabase = DEFAULT_DB;
+   private final String defaultDatabase;
protected IMetaStoreClient client;
 
public HiveCatalogBase(String catalogName, String hivemetastoreURI) {
-   this(catalogName, getHiveConf(hivemetastoreURI));
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalogBase(String catalogName, HiveConf hiveConf) {
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalogBase(String catalogName, String defaultDatabase, 
HiveConf hiveConf) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
this.catalogName = catalogName;
-
+   this.defaultDatabase = checkNotNull(defaultDatabase, 
"defaultDatabase cannot be null");
 
 Review comment:
   Adopted suggestion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-10 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r283039874
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -363,4 +332,23 @@ public CatalogColumnStatistics 
getPartitionColumnStatistics(ObjectPath tablePath
throw new UnsupportedOperationException();
}
 
+   // -- utils --
+
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private Map retrieveFlinkProperties(Map 
hiveTableParams) {
 
 Review comment:
   make sense to me. Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database

2019-05-10 Thread GitBox
bowenli86 commented on a change in pull request #8390: [FLINK-12469][table] 
Clean up catalog API on default/current database
URL: https://github.com/apache/flink/pull/8390#discussion_r283037447
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -48,23 +48,27 @@
  */
 public abstract class HiveCatalogBase implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalogBase.class);
-
-   public static final String DEFAULT_DB = "default";
+   private static final String DEFAULT_DB = "default";
 
protected final String catalogName;
protected final HiveConf hiveConf;
 
-   protected String currentDatabase = DEFAULT_DB;
+   private final String defaultDatabase;
protected IMetaStoreClient client;
 
public HiveCatalogBase(String catalogName, String hivemetastoreURI) {
-   this(catalogName, getHiveConf(hivemetastoreURI));
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalogBase(String catalogName, HiveConf hiveConf) {
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalogBase(String catalogName, String defaultDatabase, 
HiveConf hiveConf) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
this.catalogName = catalogName;
-
+   this.defaultDatabase = checkNotNull(defaultDatabase, 
"defaultDatabase cannot be null");
 
 Review comment:
   Since we already have `!StringUtils.isNullOrWhitespaceOnly(defaultDatabase)` 
above, shall we remove the `checkNotNull`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database

2019-05-10 Thread GitBox
bowenli86 commented on a change in pull request #8390: [FLINK-12469][table] 
Clean up catalog API on default/current database
URL: https://github.com/apache/flink/pull/8390#discussion_r283036856
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -65,9 +64,15 @@
private final Map> partitionColumnStats;
 
public GenericInMemoryCatalog(String name) {
+   this(name, DEFAULT_DB);
+   }
+
+   public GenericInMemoryCatalog(String name, String defaultDatabase) {
 
 Review comment:
   +1, I like Xuefu's approach as it conforms more to yaml config. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-10 Thread GitBox
azagrebin commented on issue #8416: [FLINK-12331] Introduce partition/gate 
setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#issuecomment-491418905
 
 
   @flinkbot attention @zhijiangW


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-10 Thread GitBox
flinkbot commented on issue #8416: [FLINK-12331] Introduce partition/gate setup 
to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#issuecomment-491418683
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin opened a new pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-10 Thread GitBox
azagrebin opened a new pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416
 
 
   ## What is the purpose of the change
   
   Introduce partition/gate setup to decouple task registration with 
NetworkEnvironment.
   The PR also does some preparation refactoring.
   
   ## Brief change log
   
 - Introduce factories for ResultPartion and SingleInputgates
 - Introduce buffer pool factories for ResultPartion and SingleInputgates
 - Introduce MemorySegmentProvider for RemoteInputChannel to assign 
exclusive segments
 - Refactor NetworkEnvironment#setupXXX() to ResultPartitionWriter#setup() 
and InputGate#setup()
   
   ## Verifying this change
   
   existing unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283025643
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ##
 @@ -40,63 +40,67 @@ object ExternalTableUtil extends Logging {
 * @param externalTable the [[ExternalCatalogTable]] instance which to 
convert
 * @return converted [[TableSourceTable]] instance from the input catalog 
table
 */
-  def fromExternalCatalogTable[T1, T2](
-  tableEnv: TableEnvironment,
-  externalTable: ExternalCatalogTable)
+  def fromExternalCatalogTable[T1, T2](isBatch: Boolean, externalTable: 
ExternalCatalogTable)
 : TableSourceSinkTable[T1, T2] = {
 
 val statistics = new FlinkStatistic(toScala(externalTable.getTableStats))
 
 val source: Option[TableSourceTable[T1]] = if 
(externalTable.isTableSource) {
-  Some(createTableSource(tableEnv, externalTable, statistics))
+  Some(createTableSource(isBatch, externalTable, statistics))
 } else {
   None
 }
 
 val sink: Option[TableSinkTable[T2]] = if (externalTable.isTableSink) {
-  Some(createTableSink(tableEnv, externalTable, statistics))
+  Some(createTableSink(isBatch, externalTable, statistics))
 } else {
   None
 }
 
 new TableSourceSinkTable[T1, T2](source, sink)
   }
 
+  def getTableSchema(externalTable: ExternalCatalogTable) : TableSchema = {
+if (externalTable.isTableSource) {
+  
TableFactoryUtil.findAndCreateTableSource[Any](externalTable).getTableSchema
+} else {
+  val tableSink = TableFactoryUtil.findAndCreateTableSink(externalTable)
+  new TableSchema(tableSink.getFieldNames, tableSink.getFieldTypes)
+}
+  }
+
   private def createTableSource[T](
-  tableEnv: TableEnvironment,
+  isBatch: Boolean,
   externalTable: ExternalCatalogTable,
   statistics: FlinkStatistic)
-: TableSourceTable[T] = tableEnv match {
-
-case _: BatchTableEnvImpl if externalTable.isBatchTable =>
+: TableSourceTable[T] = {
+if (isBatch && externalTable.isBatchTable) {
   val source = TableFactoryUtil.findAndCreateTableSource(externalTable)
   new BatchTableSourceTable[T](source.asInstanceOf[BatchTableSource[T]], 
statistics)
-
-case _: StreamTableEnvImpl if externalTable.isStreamTable =>
+} else if (!isBatch && externalTable.isStreamTable) {
   val source = TableFactoryUtil.findAndCreateTableSource(externalTable)
   new StreamTableSourceTable[T](source.asInstanceOf[StreamTableSource[T]], 
statistics)
-
-case _ =>
+} else {
   throw new ValidationException(
 "External catalog table does not support the current environment for a 
table source.")
 
 Review comment:
   This message might need to change because it covers two cases: isBatch==true 
&& externalTable.isStreamTable, and isBatch==false && 
externalTable.isBatchTable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10520) Job save points REST API fails unless parameters are specified

2019-05-10 Thread Tim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837525#comment-16837525
 ] 

Tim commented on FLINK-10520:
-

Is there any update on this issue?   

> Job save points REST API fails unless parameters are specified
> --
>
> Key: FLINK-10520
> URL: https://issues.apache.org/jira/browse/FLINK-10520
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.6.1
>Reporter: Elias Levy
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The new REST API POST endpoint, {{/jobs/:jobid/savepoints}}, returns an error 
> unless the request includes a body with all parameters ({{target-directory}} 
> and {{cancel-job}})), even thought the 
> [documentation|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html]
>  suggests these are optional.
> If a POST request with no data is made, the response is a 400 status code 
> with the error message "Bad request received."
> If the POST request submits an empty JSON object ( {} ), the response is a 
> 400 status code with the error message "Request did not match expected format 
> SavepointTriggerRequestBody."  The same is true if only the 
> {{target-directory}} or {{cancel-job}} parameters are included.
> As the system is configured with a default savepoint location, there 
> shouldn't be a need to include the parameter in the quest.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7697) Add metrics for Elasticsearch Sink

2019-05-10 Thread Piyush Goyal (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837512#comment-16837512
 ] 

Piyush Goyal commented on FLINK-7697:
-

+1 on adding metrics to ES Sink. 

Few others metrics that would be useful.
 * Number of failed index equests (tagged for failure reasons).
 * End-to-End latency for bulk request.

Is anyone working on it currently ? We(at Netflix) are starting to add these 
metrics to our private fork. If no one else has started working on it, we'll be 
happy to contribute it. 

> Add metrics for Elasticsearch Sink
> --
>
> Key: FLINK-7697
> URL: https://issues.apache.org/jira/browse/FLINK-7697
> Project: Flink
>  Issue Type: Wish
>  Components: Connectors / ElasticSearch
>Reporter: Hai Zhou
>Assignee: xueyu
>Priority: Major
>
> We should add metrics  to track  events write to ElasticasearchSink.
> eg. 
> * number of successful bulk sends
> * number of documents inserted
> * number of documents updated
> * number of documents version conflicts



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r282981545
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink's catalog and Calcite's schema. This enables to 
look up and access tables
+ * in SQL queries without registering tables in advance. Databases are 
registered as sub-schemas in the schema.
+ */
+@Internal
+public class CatalogCalciteSchema implements Schema {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(CatalogCalciteSchema.class);
+
+   private final String catalogName;
+   private final Catalog catalog;
+
+   public CatalogCalciteSchema(String catalogName, Catalog catalog) {
+   this.catalogName = catalogName;
+   this.catalog = catalog;
+   }
+
+   /**
+* Look up a sub-schema (database) by the given sub-schema name.
+*
+* @param schemaName name of sub-schema to look up
+* @return the sub-schema with a given dbName, or null
+*/
+   @Override
+   public Schema getSubSchema(String schemaName) {
+
+   if (catalog.databaseExists(schemaName)) {
+   return new DatabaseCalciteSchema(schemaName, catalog);
+   } else {
+   LOGGER.error(String.format("Schema %s does not exist in 
catalog %s", schemaName, catalogName));
+   throw new CatalogException(new 
DatabaseNotExistException(catalogName, schemaName));
+   }
+   }
+
+   @Override
+   public Set getSubSchemaNames() {
+   return new HashSet<>(catalog.listDatabases());
+   }
+
+   @Override
+   public Table getTable(String name) {
+   return null;
+   }
+
+   @Override
+   public Set getTableNames() {
+   return new HashSet<>();
+   }
+
+   @Override
+   public RelProtoDataType getType(String name) {
+   return null;
 
 Review comment:
   Returning "null" means no type with the given name is found. I'm not sure if 
that's desired, but feel free to add an TODO item for this if necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yuyang08 closed pull request #8067: [FLINK-11746][formats] Add thrift format support to Flink

2019-05-10 Thread GitBox
yuyang08 closed pull request #8067: [FLINK-11746][formats] Add thrift format 
support to Flink
URL: https://github.com/apache/flink/pull/8067
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-10 Thread GitBox
StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new 
bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#issuecomment-491373523
 
 
   @zhijiangW Our reasoning about the three options seems similar.
   
   My understanding is that even if CancelPartition is not reliable, it can 
only fail when the TCP connection fails. In that case, all associated 
SubpartitionViews are released immediately, which should work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-10 Thread GitBox
StephanEwen commented on a change in pull request #8290: [FLINK-12070] 
[network] Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r282978232
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition.
+ */
+class BoundedBlockingSubpartitionReader implements ResultSubpartitionView {
+
+   /** The result subpartition that we read. */
+   private final BoundedBlockingSubpartition parent;
+
+   /** Further byte buffers, to handle cases where there is more data than 
fits into
+* one mapped byte buffer (2GB = Integer.MAX_VALUE). */
+   private final Iterator furtherData;
+
+   /** The reader/decoder to the memory mapped region with the data we 
currently read from.
+* Max 2GB large. Further regions may be in the {@link #furtherData} 
field.
+* Null once the reader empty or disposed.*/
+   @Nullable
+   private BoundedBlockingSubpartitionMemory.Reader data;
+
+   /** The next buffer (look ahead). Null once the data is depleted or 
reader is disposed. */
+   @Nullable
+   private Buffer nextBuffer;
+
+   /** The remaining number of data buffers (not events) in the result. */
+   private int bufferBacklog;
+
+   /** Flag whether this reader is released. Atomic, to avoid double 
release. */
+   private boolean isReleased;
+
+   /**
+* Convenience constructor that takes a single buffer.
+*/
+   BoundedBlockingSubpartitionReader(
 
 Review comment:
   have removed it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-10 Thread GitBox
StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new 
bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#issuecomment-491372403
 
 
   Thank you for the reviews.
   
   I refactored some code:
 - Pulled out the `MemoryMappedBuffers`) class to better test `size()` and 
region spanning writes and avoid leaking the region configuration into the 
`BoundedBlockingSubpartition`.
 - Added comments to make the treading model assumptions more clear
 - Addressed comments on typos, etc.
   
   The code staged to be merged is in 
https://github.com/StephanEwen/incubator-flink/commit/a7cf24383be9f310fb5ccc5a032721421fa45791
   
   I think overall, this is now in a really good shape. Waiting for Travis to 
pass, will then merge this.
   
   Followups are to to actually make use of the "multiple read" functionality.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r282976905
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.Table;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Thin wrapper around Calcite specific {@link Table}, this is a temporary 
solution
+ * that allows to register those tables in the {@link CatalogManager}.
+ * TODO remove once we decouple TableEnvironment from Calcite.
+ */
+@Internal
+public class CalciteCatalogTable implements CatalogBaseTable {
+   private final Table table;
+   private final FlinkTypeFactory typeFactory;
+
+   public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) {
+   this.table = table;
+   this.typeFactory = typeFactory;
+   }
+
+   public Table getTable() {
+   return table;
+   }
+
+   @Override
+   public Map getProperties() {
+   throw new UnsupportedOperationException("Calcite table cannot 
be expressed as a map of properties.");
+   }
+
+   @Override
+   public TableSchema getSchema() {
+   RelDataType relDataType = table.getRowType(typeFactory);
+
+   String[] fieldNames = relDataType.getFieldNames().toArray(new 
String[0]);
+   TypeInformation[] fieldTypes = relDataType.getFieldList()
+   .stream()
+   .map(field -> 
FlinkTypeFactory.toTypeInfo(field.getType())).toArray(TypeInformation[]::new);
+
+   return new TableSchema(fieldNames, fieldTypes);
+   }
+
+   @Override
+   public String getComment() {
+   return null;
+   }
+
+   @Override
+   public CatalogBaseTable copy() {
+   return this;
 
 Review comment:
   Maybe we throw an exception here because we are not able to provide a copy. 
Return just this might be dangerous because caller assumes a copy and make 
changes to the returned object.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r282967862
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.Table;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Thin wrapper around Calcite specific {@link Table}, this is a temporary 
solution
+ * that allows to register those tables in the {@link CatalogManager}.
+ * TODO remove once we decouple TableEnvironment from Calcite.
+ */
+@Internal
+public class CalciteCatalogTable implements CatalogBaseTable {
+   private final Table table;
+   private final FlinkTypeFactory typeFactory;
+
+   public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) {
+   this.table = table;
+   this.typeFactory = typeFactory;
+   }
+
+   public Table getTable() {
+   return table;
+   }
+
+   @Override
+   public Map getProperties() {
+   throw new UnsupportedOperationException("Calcite table cannot 
be expressed as a map of properties.");
 
 Review comment:
   It might be better just to return an empty map instead of throwing an 
exception. By the definition, getProperties() return any additional properties 
a table might have. It doesn't mean the property form of the table.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r282961373
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -280,6 +316,55 @@
 */
void sqlUpdate(String stmt, QueryConfig config);
 
+   /**
+* Gets the current default catalog name of the current session.
+*
+* @return the current default catalog that is used for path resolution
+* @see TableEnvironment#setCurrentCatalog(String)
+*/
+   String getCurrentCatalogName();
+
+   /**
+* Sets the current catalog to the given value. It also sets the default
+* database to the catalog's default one. To assign both catalog and 
database explicitly
+* see {@link TableEnvironment#setCurrentDatabase(String, String)}.
+*
+* This is used during resolution of object paths. The default path 
is constructed as
+* {@code [current-catalog].[current.database]}. During the resolution, 
first we try to look for
+* {@code [default-path].[object-path]} if no object is found we assume 
the object path is a fully
+* qualified one and we look for {@code [object-path]}.
+*
+* @param name name of the catalog to set as current default catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+*/
+   void setCurrentCatalog(String name) throws CatalogNotExistException;
+
+   /**
+* Gets the current default database name of the running session.
+*
+* @return the current database of the current catalog
+* @see TableEnvironment#setCurrentDatabase(String, String)
+*/
+   String getCurrentDatabaseName();
+
+   /**
+* Sets the current default catalog and database. That path will be 
used as the default one
+* when looking for unqualified object names.
+*
+* This is used during resolution of object paths. The default path 
is constructed as
+* {@code [current-catalog].[current.database]}. During the resolution, 
first we try to look for
 
 Review comment:
   should it be [current-database]?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8390: [FLINK-12469][table] Clean 
up catalog API on default/current database
URL: https://github.com/apache/flink/pull/8390#discussion_r282954828
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -65,9 +64,15 @@
private final Map> partitionColumnStats;
 
public GenericInMemoryCatalog(String name) {
+   this(name, DEFAULT_DB);
+   }
+
+   public GenericInMemoryCatalog(String name, String defaultDatabase) {
 
 Review comment:
   Cool. Got it. Will update. Thanks. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-10 Thread GitBox
StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new 
bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#issuecomment-491335910
 
 
   @zhijiangW Thanks for the detailed review and good comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-10 Thread GitBox
StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new 
bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#issuecomment-491336129
 
 
   @eaglewatcherwb This implementation makes no assumption when a partition 
will be released. That is the responsibility of the network environment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ambition119 commented on issue #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource

2019-05-10 Thread GitBox
ambition119 commented on issue #8387: [FLINK-11982] [table] File System 
Connector's support JSON Format and JSON file BatchTableSource
URL: https://github.com/apache/flink/pull/8387#issuecomment-491337056
 
 
   > Ad. 1 Once again we aim to separate connector and formats => JsonFile = 
Json + File
   > Ad. 2 It provides `FileSystem` descriptor not connector. See my comments 
about `CsvTableSource`. It's design is broken and we do not want to repeat this 
error
   > Ad. 3 Good to know, hive is not the source of truth though. Unfortunately 
we cannot accept general purpose code that works in just a single very narrow 
setup.
   
   @StephanEwen @dawidwys  Thank you for your suggestions and responses.  I am 
developing based on flink 1.6.x, and now I understand what you are talking 
about, so I will close this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ambition119 closed pull request #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource

2019-05-10 Thread GitBox
ambition119 closed pull request #8387: [FLINK-11982] [table] File System 
Connector's support JSON Format and JSON file BatchTableSource
URL: https://github.com/apache/flink/pull/8387
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-10 Thread GitBox
StephanEwen commented on a change in pull request #8290: [FLINK-12070] 
[network] Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r282938868
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionMemory.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Putting and getting of buffers to/from a region of memory.
+ * This class handles the headers, length encoding, memory slicing.
+ */
+final class BoundedBlockingSubpartitionMemory {
+
+   // all fields and methods below here have package-private access to 
avoid bridge
+   // methods when accessing them from the nested classes
+
+   static final int HEADER_LENGTH = 8;
+
+   static final int HEADER_VALUE_IS_BUFFER = 0;
+
+   static final int HEADER_VALUE_IS_EVENT = 1;
+
+   static ByteBuffer checkAndConfigureByteBuffer(ByteBuffer buffer) {
+   checkArgument(buffer.position() == 0);
+   checkArgument(buffer.capacity() > 8);
+   checkArgument(buffer.limit() == buffer.capacity());
+
+   return buffer.order(ByteOrder.nativeOrder());
+   }
+
+   // 

+
+   static final class Writer {
+
+   private final ByteBuffer memory;
+
+   Writer(ByteBuffer memory) {
+   this.memory = checkAndConfigureByteBuffer(memory);
+   }
+
+   public boolean writeBuffer(Buffer buffer) {
+   final ByteBuffer memory = this.memory;
+   final int bufferSize = buffer.getSize();
+
+   if (memory.remaining() < bufferSize + HEADER_LENGTH) {
+   return false;
+   }
+
+   memory.putInt(buffer.isBuffer() ? 
HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT);
+   memory.putInt(bufferSize);
+   memory.put(buffer.getNioBufferReadable());
+   return true;
+   }
+
+   public ByteBuffer complete() {
+   memory.flip();
+   return memory;
+   }
+
+   public int getNumBytes() {
+   return memory.position();
+   }
+   }
+
+   static final class Reader {
+
+   private final ByteBuffer memory;
+
+   Reader(ByteBuffer memory) {
+   this.memory = checkAndConfigureByteBuffer(memory);
+   }
+
+   @Nullable
+   public Buffer sliceNextBuffer() {
+   final ByteBuffer memory = this.memory;
 
 Review comment:
   Localize the scope. Makes it clear to the reader that this is strictly local 
access, and often helps Java avoid some instructions (avoid indirection via 
implicit `this.`)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-10 Thread GitBox
StephanEwen commented on a change in pull request #8290: [FLINK-12070] 
[network] Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r282938255
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+import 
org.apache.flink.shaded.netty4.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess;
+
+import sun.misc.Unsafe;
+
+import javax.annotation.Nullable;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Utility for accessing the system page size.
+ */
+final class PageSizeUtil {
+
+   /** Value indicating an unknown page size. */
+   public static final int PAGE_SIZE_UNKOWN = -1;
+
+   /** The default page size on most system. */
+   public static final int DEFAULT_PAGE_SIZE = 4 * 1024;
+
+   /** A conservative fallback value (64 KiBytes) that should be a 
multiple of the page size even
+* in some uncommon cases of servers installations with 
larger-than-usual page sizes. */
+   public static final int CONSERVATIVE_PAGE_SIZE_MULTIPLE = 64 * 1024;
+
+   /**
+* Tries to get the system page size. If the page size cannot be 
determined, this
+* returns -1.
+*
+* This internally relies on the presence of "unsafe" and the 
resolution via some
+* Netty utilities.
+*/
+   public static int getSystemPageSize() {
+   try {
+   return PageSizeUtilInternal.getSystemPageSize();
+   }
+   catch (Throwable t) {
+   ExceptionUtils.rethrowIfFatalError(t);
+   return PAGE_SIZE_UNKOWN;
+   }
+   }
+
+   /**
+* Tries to get the system page size. If the page size cannot be 
determined, this
+* returns the {@link #DEFAULT_PAGE_SIZE}.
+*/
+   public static int getSystemPageSizeOrDefault() {
+   final int pageSize = getSystemPageSize();
+   return pageSize == PAGE_SIZE_UNKOWN ? DEFAULT_PAGE_SIZE : 
pageSize;
+   }
+
+   /**
+* Tries to get the system page size. If the page size cannot be 
determined, this
+* returns the {@link #CONSERVATIVE_PAGE_SIZE_MULTIPLE}.
+*/
+   public static int getSystemPageSizeOrConservativeMultiple() {
 
 Review comment:
   This util is meant as a more general util


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-10 Thread GitBox
StephanEwen commented on a change in pull request #8290: [FLINK-12070] 
[network] Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r282938159
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+import 
org.apache.flink.shaded.netty4.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess;
+
+import sun.misc.Unsafe;
+
+import javax.annotation.Nullable;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Utility for accessing the system page size.
+ */
+final class PageSizeUtil {
+
+   /** Value indicating an unknown page size. */
+   public static final int PAGE_SIZE_UNKOWN = -1;
+
+   /** The default page size on most system. */
+   public static final int DEFAULT_PAGE_SIZE = 4 * 1024;
+
+   /** A conservative fallback value (64 KiBytes) that should be a 
multiple of the page size even
+* in some uncommon cases of servers installations with 
larger-than-usual page sizes. */
+   public static final int CONSERVATIVE_PAGE_SIZE_MULTIPLE = 64 * 1024;
+
+   /**
+* Tries to get the system page size. If the page size cannot be 
determined, this
+* returns -1.
+*
+* This internally relies on the presence of "unsafe" and the 
resolution via some
+* Netty utilities.
+*/
+   public static int getSystemPageSize() {
+   try {
+   return PageSizeUtilInternal.getSystemPageSize();
+   }
+   catch (Throwable t) {
+   ExceptionUtils.rethrowIfFatalError(t);
+   return PAGE_SIZE_UNKOWN;
+   }
+   }
+
+   /**
+* Tries to get the system page size. If the page size cannot be 
determined, this
+* returns the {@link #DEFAULT_PAGE_SIZE}.
+*/
+   public static int getSystemPageSizeOrDefault() {
 
 Review comment:
   This util is meant as a more general util


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-10 Thread GitBox
StephanEwen commented on a change in pull request #8290: [FLINK-12070] 
[network] Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r282938334
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+import 
org.apache.flink.shaded.netty4.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess;
+
+import sun.misc.Unsafe;
+
+import javax.annotation.Nullable;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Utility for accessing the system page size.
+ */
+final class PageSizeUtil {
+
+   /** Value indicating an unknown page size. */
+   public static final int PAGE_SIZE_UNKOWN = -1;
 
 Review comment:
   will fix that


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sjwiesman commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist

2019-05-10 Thread GitBox
sjwiesman commented on a change in pull request #8330: [FLINK-12388][docs] 
Update the production readiness checklist
URL: https://github.com/apache/flink/pull/8330#discussion_r282937987
 
 

 ##
 File path: docs/ops/production_ready.md
 ##
 @@ -22,79 +22,54 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+The production readiness checklist provides an overview of configuration 
options that should be carefully considered before bringing an Apache Flink job 
into production. 
+While the Flink community has attempted to provide sensible defaults for each 
configuration, it is important to review this list and ensure the options 
chosen are sufficient for your needs. 
+
 * ToC
 {:toc}
 
-## Production Readiness Checklist
-
-Purpose of this production readiness checklist is to provide a condensed 
overview of configuration options that are
-important and need **careful considerations** if you plan to bring your Flink 
job into **production**. For most of these options
-Flink provides out-of-the-box defaults to make usage and adoption of Flink 
easier. For many users and scenarios, those
-defaults are good starting points for development and completely sufficient 
for "one-shot" jobs. 
-
-However, once you are planning to bring a Flink application to production the 
requirements typically increase. For example,
-you want your job to be (re-)scalable and to have a good upgrade story for 
your job and new Flink versions.
-
-In the following, we present a collection of configuration options that you 
should check before your job goes into production.
-
-### Set maximum parallelism for operators explicitly
-
-Maximum parallelism is a configuration parameter that is newly introduced in 
Flink 1.2 and has important implications
-for the (re-)scalability of your Flink job. This parameter, which can be set 
on a per-job and/or per-operator granularity,
-determines the maximum parallelism to which you can scale operators. It is 
important to understand that (as of now) there
-is **no way to change** this parameter after your job has been started, except 
for restarting your job completely 
-from scratch (i.e. with a new state, and not from a previous 
checkpoint/savepoint). Even if Flink would provide some way
-to change maximum parallelism for existing savepoints in the future, you can 
already assume that for large states this is 
-likely a long running operation that you want to avoid. At this point, you 
might wonder why not just to use a very high
-value as default for this parameter. The reason behind this is that high 
maximum parallelism can have some impact on your
-application's performance and even state sizes, because Flink has to maintain 
certain metadata for its ability to rescale which
-can increase with the maximum parallelism. In general, you should choose a max 
parallelism that is high enough to fit your
-future needs in scalability, but keeping it as low as possible can give 
slightly better performance. In particular,
-a maximum parallelism higher that 128 will typically result in slightly bigger 
state snapshots from the keyed backends.
+### Set An Explicit Max Parallelism
 
-Notice that maximum parallelism must fulfill the following conditions:
+The max parallelism, set on a per-job and per-operator granularity, determines 
the maximum parallelism to which a stateful operator can scale.
+There is currently **no way to change** the maximum parallelism of an operator 
after a job has started without discarding that operators state. 
+The reason maximum parallelism exists, versus allowing stateful operators to 
be infinitely scalable, is that it has some impact on your application's 
performance and state size.
+Flink has to maintain specific metadata for its ability to rescale state which 
grows linearly with max parallelism.
+In general, you should choose max parallelism that is high enough to fit your 
future needs in scalability, while keeping it low enough to maintain reasonable 
performance.
 
-`0 < parallelism  <= max parallelism <= 2^15`
+{% panel **Note:** Maximum parallelism must fulfill the following conditions: 
`0 < parallelism  <= max parallelism <= 2^15` %}
 
-You can set the maximum parallelism by `setMaxParallelism(int 
maxparallelism)`. By default, Flink will choose the maximum
-parallelism as a function of the parallelism when the job is first started:
+You can explicitly set maximum parallelism by using `setMaxParallelism(int 
maxparallelism)`. 
+If no max parallelism is set Flink will decide using a function of the 
operators parallelism when the job is first started:
 
 - `128` : for all parallelism <= 128.
 - `MIN(nextPowerOfTwo(parallelism + (parallelism / 2)), 2^15)` : for all 
parallelism > 128.
 
-### Set UUIDs for operators
+### Set UUIDs For All Operators
 
-As mentioned in the documentation for [savepoints]({{ site.baseurl 
}}/ops/state/savepoints.html), users should set uids for
-operators. Those operator uids are 

[GitHub] [flink] fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist

2019-05-10 Thread GitBox
fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update 
the production readiness checklist
URL: https://github.com/apache/flink/pull/8330#discussion_r282917921
 
 

 ##
 File path: docs/ops/production_ready.md
 ##
 @@ -22,79 +22,54 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+The production readiness checklist provides an overview of configuration 
options that should be carefully considered before bringing an Apache Flink job 
into production. 
+While the Flink community has attempted to provide sensible defaults for each 
configuration, it is important to review this list and ensure the options 
chosen are sufficient for your needs. 
+
 * ToC
 {:toc}
 
-## Production Readiness Checklist
-
-Purpose of this production readiness checklist is to provide a condensed 
overview of configuration options that are
-important and need **careful considerations** if you plan to bring your Flink 
job into **production**. For most of these options
-Flink provides out-of-the-box defaults to make usage and adoption of Flink 
easier. For many users and scenarios, those
-defaults are good starting points for development and completely sufficient 
for "one-shot" jobs. 
-
-However, once you are planning to bring a Flink application to production the 
requirements typically increase. For example,
-you want your job to be (re-)scalable and to have a good upgrade story for 
your job and new Flink versions.
-
-In the following, we present a collection of configuration options that you 
should check before your job goes into production.
-
-### Set maximum parallelism for operators explicitly
-
-Maximum parallelism is a configuration parameter that is newly introduced in 
Flink 1.2 and has important implications
-for the (re-)scalability of your Flink job. This parameter, which can be set 
on a per-job and/or per-operator granularity,
-determines the maximum parallelism to which you can scale operators. It is 
important to understand that (as of now) there
-is **no way to change** this parameter after your job has been started, except 
for restarting your job completely 
-from scratch (i.e. with a new state, and not from a previous 
checkpoint/savepoint). Even if Flink would provide some way
-to change maximum parallelism for existing savepoints in the future, you can 
already assume that for large states this is 
-likely a long running operation that you want to avoid. At this point, you 
might wonder why not just to use a very high
-value as default for this parameter. The reason behind this is that high 
maximum parallelism can have some impact on your
-application's performance and even state sizes, because Flink has to maintain 
certain metadata for its ability to rescale which
-can increase with the maximum parallelism. In general, you should choose a max 
parallelism that is high enough to fit your
-future needs in scalability, but keeping it as low as possible can give 
slightly better performance. In particular,
-a maximum parallelism higher that 128 will typically result in slightly bigger 
state snapshots from the keyed backends.
+### Set An Explicit Max Parallelism
 
-Notice that maximum parallelism must fulfill the following conditions:
+The max parallelism, set on a per-job and per-operator granularity, determines 
the maximum parallelism to which a stateful operator can scale.
+There is currently **no way to change** the maximum parallelism of an operator 
after a job has started without discarding that operators state. 
+The reason maximum parallelism exists, versus allowing stateful operators to 
be infinitely scalable, is that it has some impact on your application's 
performance and state size.
+Flink has to maintain specific metadata for its ability to rescale state which 
grows linearly with max parallelism.
 
 Review comment:
   nit: AFAIK it does not grow linearly but logarithmic. (the key group id 
needs one more bit for each power-of-two step). Maybe just drop "linearly"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist

2019-05-10 Thread GitBox
fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update 
the production readiness checklist
URL: https://github.com/apache/flink/pull/8330#discussion_r282921904
 
 

 ##
 File path: docs/ops/production_ready.md
 ##
 @@ -22,79 +22,54 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+The production readiness checklist provides an overview of configuration 
options that should be carefully considered before bringing an Apache Flink job 
into production. 
+While the Flink community has attempted to provide sensible defaults for each 
configuration, it is important to review this list and ensure the options 
chosen are sufficient for your needs. 
+
 * ToC
 {:toc}
 
-## Production Readiness Checklist
-
-Purpose of this production readiness checklist is to provide a condensed 
overview of configuration options that are
-important and need **careful considerations** if you plan to bring your Flink 
job into **production**. For most of these options
-Flink provides out-of-the-box defaults to make usage and adoption of Flink 
easier. For many users and scenarios, those
-defaults are good starting points for development and completely sufficient 
for "one-shot" jobs. 
-
-However, once you are planning to bring a Flink application to production the 
requirements typically increase. For example,
-you want your job to be (re-)scalable and to have a good upgrade story for 
your job and new Flink versions.
-
-In the following, we present a collection of configuration options that you 
should check before your job goes into production.
-
-### Set maximum parallelism for operators explicitly
-
-Maximum parallelism is a configuration parameter that is newly introduced in 
Flink 1.2 and has important implications
-for the (re-)scalability of your Flink job. This parameter, which can be set 
on a per-job and/or per-operator granularity,
-determines the maximum parallelism to which you can scale operators. It is 
important to understand that (as of now) there
-is **no way to change** this parameter after your job has been started, except 
for restarting your job completely 
-from scratch (i.e. with a new state, and not from a previous 
checkpoint/savepoint). Even if Flink would provide some way
-to change maximum parallelism for existing savepoints in the future, you can 
already assume that for large states this is 
-likely a long running operation that you want to avoid. At this point, you 
might wonder why not just to use a very high
-value as default for this parameter. The reason behind this is that high 
maximum parallelism can have some impact on your
-application's performance and even state sizes, because Flink has to maintain 
certain metadata for its ability to rescale which
-can increase with the maximum parallelism. In general, you should choose a max 
parallelism that is high enough to fit your
-future needs in scalability, but keeping it as low as possible can give 
slightly better performance. In particular,
-a maximum parallelism higher that 128 will typically result in slightly bigger 
state snapshots from the keyed backends.
+### Set An Explicit Max Parallelism
 
-Notice that maximum parallelism must fulfill the following conditions:
+The max parallelism, set on a per-job and per-operator granularity, determines 
the maximum parallelism to which a stateful operator can scale.
+There is currently **no way to change** the maximum parallelism of an operator 
after a job has started without discarding that operators state. 
 
 Review comment:
   un-highlight "no way to change"? The other aspects have similar issues and 
are not highlighted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist

2019-05-10 Thread GitBox
fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update 
the production readiness checklist
URL: https://github.com/apache/flink/pull/8330#discussion_r282919635
 
 

 ##
 File path: docs/ops/production_ready.md
 ##
 @@ -22,79 +22,54 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+The production readiness checklist provides an overview of configuration 
options that should be carefully considered before bringing an Apache Flink job 
into production. 
+While the Flink community has attempted to provide sensible defaults for each 
configuration, it is important to review this list and ensure the options 
chosen are sufficient for your needs. 
+
 * ToC
 {:toc}
 
-## Production Readiness Checklist
-
-Purpose of this production readiness checklist is to provide a condensed 
overview of configuration options that are
-important and need **careful considerations** if you plan to bring your Flink 
job into **production**. For most of these options
-Flink provides out-of-the-box defaults to make usage and adoption of Flink 
easier. For many users and scenarios, those
-defaults are good starting points for development and completely sufficient 
for "one-shot" jobs. 
-
-However, once you are planning to bring a Flink application to production the 
requirements typically increase. For example,
-you want your job to be (re-)scalable and to have a good upgrade story for 
your job and new Flink versions.
-
-In the following, we present a collection of configuration options that you 
should check before your job goes into production.
-
-### Set maximum parallelism for operators explicitly
-
-Maximum parallelism is a configuration parameter that is newly introduced in 
Flink 1.2 and has important implications
-for the (re-)scalability of your Flink job. This parameter, which can be set 
on a per-job and/or per-operator granularity,
-determines the maximum parallelism to which you can scale operators. It is 
important to understand that (as of now) there
-is **no way to change** this parameter after your job has been started, except 
for restarting your job completely 
-from scratch (i.e. with a new state, and not from a previous 
checkpoint/savepoint). Even if Flink would provide some way
-to change maximum parallelism for existing savepoints in the future, you can 
already assume that for large states this is 
-likely a long running operation that you want to avoid. At this point, you 
might wonder why not just to use a very high
-value as default for this parameter. The reason behind this is that high 
maximum parallelism can have some impact on your
-application's performance and even state sizes, because Flink has to maintain 
certain metadata for its ability to rescale which
-can increase with the maximum parallelism. In general, you should choose a max 
parallelism that is high enough to fit your
-future needs in scalability, but keeping it as low as possible can give 
slightly better performance. In particular,
-a maximum parallelism higher that 128 will typically result in slightly bigger 
state snapshots from the keyed backends.
+### Set An Explicit Max Parallelism
 
-Notice that maximum parallelism must fulfill the following conditions:
+The max parallelism, set on a per-job and per-operator granularity, determines 
the maximum parallelism to which a stateful operator can scale.
+There is currently **no way to change** the maximum parallelism of an operator 
after a job has started without discarding that operators state. 
+The reason maximum parallelism exists, versus allowing stateful operators to 
be infinitely scalable, is that it has some impact on your application's 
performance and state size.
+Flink has to maintain specific metadata for its ability to rescale state which 
grows linearly with max parallelism.
+In general, you should choose max parallelism that is high enough to fit your 
future needs in scalability, while keeping it low enough to maintain reasonable 
performance.
 
-`0 < parallelism  <= max parallelism <= 2^15`
+{% panel **Note:** Maximum parallelism must fulfill the following conditions: 
`0 < parallelism  <= max parallelism <= 2^15` %}
 
-You can set the maximum parallelism by `setMaxParallelism(int 
maxparallelism)`. By default, Flink will choose the maximum
-parallelism as a function of the parallelism when the job is first started:
+You can explicitly set maximum parallelism by using `setMaxParallelism(int 
maxparallelism)`. 
+If no max parallelism is set Flink will decide using a function of the 
operators parallelism when the job is first started:
 
 - `128` : for all parallelism <= 128.
 - `MIN(nextPowerOfTwo(parallelism + (parallelism / 2)), 2^15)` : for all 
parallelism > 128.
 
-### Set UUIDs for operators
+### Set UUIDs For All Operators
 
-As mentioned in the documentation for [savepoints]({{ site.baseurl 
}}/ops/state/savepoints.html), users should set uids for
-operators. Those operator uids are 

[GitHub] [flink] fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist

2019-05-10 Thread GitBox
fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update 
the production readiness checklist
URL: https://github.com/apache/flink/pull/8330#discussion_r282920531
 
 

 ##
 File path: docs/ops/production_ready.md
 ##
 @@ -22,79 +22,54 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+The production readiness checklist provides an overview of configuration 
options that should be carefully considered before bringing an Apache Flink job 
into production. 
+While the Flink community has attempted to provide sensible defaults for each 
configuration, it is important to review this list and ensure the options 
chosen are sufficient for your needs. 
+
 * ToC
 {:toc}
 
-## Production Readiness Checklist
-
-Purpose of this production readiness checklist is to provide a condensed 
overview of configuration options that are
-important and need **careful considerations** if you plan to bring your Flink 
job into **production**. For most of these options
-Flink provides out-of-the-box defaults to make usage and adoption of Flink 
easier. For many users and scenarios, those
-defaults are good starting points for development and completely sufficient 
for "one-shot" jobs. 
-
-However, once you are planning to bring a Flink application to production the 
requirements typically increase. For example,
-you want your job to be (re-)scalable and to have a good upgrade story for 
your job and new Flink versions.
-
-In the following, we present a collection of configuration options that you 
should check before your job goes into production.
-
-### Set maximum parallelism for operators explicitly
-
-Maximum parallelism is a configuration parameter that is newly introduced in 
Flink 1.2 and has important implications
-for the (re-)scalability of your Flink job. This parameter, which can be set 
on a per-job and/or per-operator granularity,
-determines the maximum parallelism to which you can scale operators. It is 
important to understand that (as of now) there
-is **no way to change** this parameter after your job has been started, except 
for restarting your job completely 
-from scratch (i.e. with a new state, and not from a previous 
checkpoint/savepoint). Even if Flink would provide some way
-to change maximum parallelism for existing savepoints in the future, you can 
already assume that for large states this is 
-likely a long running operation that you want to avoid. At this point, you 
might wonder why not just to use a very high
-value as default for this parameter. The reason behind this is that high 
maximum parallelism can have some impact on your
-application's performance and even state sizes, because Flink has to maintain 
certain metadata for its ability to rescale which
-can increase with the maximum parallelism. In general, you should choose a max 
parallelism that is high enough to fit your
-future needs in scalability, but keeping it as low as possible can give 
slightly better performance. In particular,
-a maximum parallelism higher that 128 will typically result in slightly bigger 
state snapshots from the keyed backends.
+### Set An Explicit Max Parallelism
 
-Notice that maximum parallelism must fulfill the following conditions:
+The max parallelism, set on a per-job and per-operator granularity, determines 
the maximum parallelism to which a stateful operator can scale.
+There is currently **no way to change** the maximum parallelism of an operator 
after a job has started without discarding that operators state. 
+The reason maximum parallelism exists, versus allowing stateful operators to 
be infinitely scalable, is that it has some impact on your application's 
performance and state size.
+Flink has to maintain specific metadata for its ability to rescale state which 
grows linearly with max parallelism.
+In general, you should choose max parallelism that is high enough to fit your 
future needs in scalability, while keeping it low enough to maintain reasonable 
performance.
 
-`0 < parallelism  <= max parallelism <= 2^15`
+{% panel **Note:** Maximum parallelism must fulfill the following conditions: 
`0 < parallelism  <= max parallelism <= 2^15` %}
 
-You can set the maximum parallelism by `setMaxParallelism(int 
maxparallelism)`. By default, Flink will choose the maximum
-parallelism as a function of the parallelism when the job is first started:
+You can explicitly set maximum parallelism by using `setMaxParallelism(int 
maxparallelism)`. 
+If no max parallelism is set Flink will decide using a function of the 
operators parallelism when the job is first started:
 
 - `128` : for all parallelism <= 128.
 - `MIN(nextPowerOfTwo(parallelism + (parallelism / 2)), 2^15)` : for all 
parallelism > 128.
 
-### Set UUIDs for operators
+### Set UUIDs For All Operators
 
-As mentioned in the documentation for [savepoints]({{ site.baseurl 
}}/ops/state/savepoints.html), users should set uids for
-operators. Those operator uids are 

[GitHub] [flink] yanghua commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore

2019-05-10 Thread GitBox
yanghua commented on a change in pull request #8410: [FLINK-11159] Allow 
configuration whether to fall back to savepoints for restore
URL: https://github.com/apache/flink/pull/8410#discussion_r282921569
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -183,6 +184,8 @@
/** Registry that tracks state which is shared across (incremental) 
checkpoints. */
private SharedStateRegistry sharedStateRegistry;
 
+   private boolean isPerferCheckpointForRecovery;
 
 Review comment:
   Sorry for the typo, will fix it soon.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore

2019-05-10 Thread GitBox
yanghua commented on a change in pull request #8410: [FLINK-11159] Allow 
configuration whether to fall back to savepoints for restore
URL: https://github.com/apache/flink/pull/8410#discussion_r282921234
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 ##
 @@ -72,6 +72,9 @@
/** Determines if a tasks are failed or not if there is an error in 
their checkpointing. Default: true */
private boolean failOnCheckpointingErrors = true;
 
+   /** Determines if a job will fallback to checkpoint when there is a 
savepoint later than checkpoint. **/
+   private boolean perferCheckpointForRecovery = false;
 
 Review comment:
   I think this config option belongs to checkpoint field. So provide it in 
`CheckpointConfig` sounds more reasonable. What's more, it is a job level 
config option, right? However, `flink-conf.yaml` is a cluster-level 
configuration file(if users deploy Flink with standalone mode, it will affect 
all the jobs).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore

2019-05-10 Thread GitBox
yanghua commented on a change in pull request #8410: [FLINK-11159] Allow 
configuration whether to fall back to savepoints for restore
URL: https://github.com/apache/flink/pull/8410#discussion_r282919368
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1093,6 +1098,23 @@ public boolean restoreLatestCheckpointedState(
}
}
 
+   CompletedCheckpoint candidate = null;
+   if (isPerferCheckpointForRecovery && 
completedCheckpointStore.getAllCheckpoints().size() > 1) {
 
 Review comment:
   Agree.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gyfora commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore

2019-05-10 Thread GitBox
gyfora commented on a change in pull request #8410: [FLINK-11159] Allow 
configuration whether to fall back to savepoints for restore
URL: https://github.com/apache/flink/pull/8410#discussion_r282908128
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -1093,6 +1098,23 @@ public boolean restoreLatestCheckpointedState(
}
}
 
+   CompletedCheckpoint candidate = null;
+   if (isPerferCheckpointForRecovery && 
completedCheckpointStore.getAllCheckpoints().size() > 1) {
 
 Review comment:
   It feels like this part basically reimplements the getLatestCheckpoint() 
logic of the completed checkpoint store. Maybe instead of having this hear the 
getLatestCheckpoint( ) could take the preferCheckpoints boolean parameter for 
better encapsulation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gyfora commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore

2019-05-10 Thread GitBox
gyfora commented on a change in pull request #8410: [FLINK-11159] Allow 
configuration whether to fall back to savepoints for restore
URL: https://github.com/apache/flink/pull/8410#discussion_r282907507
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##
 @@ -183,6 +184,8 @@
/** Registry that tracks state which is shared across (incremental) 
checkpoints. */
private SharedStateRegistry sharedStateRegistry;
 
+   private boolean isPerferCheckpointForRecovery;
 
 Review comment:
   There is a typo in basically every use of the word 
"preferCheckpointForRecovery" it should be prefer not "perfer". I think the 
easiest would be to search the changes and correct it in every place.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gyfora commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore

2019-05-10 Thread GitBox
gyfora commented on a change in pull request #8410: [FLINK-11159] Allow 
configuration whether to fall back to savepoints for restore
URL: https://github.com/apache/flink/pull/8410#discussion_r282909021
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 ##
 @@ -72,6 +72,9 @@
/** Determines if a tasks are failed or not if there is an error in 
their checkpointing. Default: true */
private boolean failOnCheckpointingErrors = true;
 
+   /** Determines if a job will fallback to checkpoint when there is a 
savepoint later than checkpoint. **/
+   private boolean perferCheckpointForRecovery = false;
 
 Review comment:
   I am personally not a big fan of configuring checkpoint parameters in the 
CheckpointConfig class, I actually wish that all these options would be exposed 
as general flink configuration parameters (settable in flink-conf.yaml).
   
   What do you think @StefanRRichter ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor

2019-05-10 Thread GitBox
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers 
the termination of all running Tasks when shutting down TaskExecutor
URL: https://github.com/apache/flink/pull/7757#discussion_r282897244
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -348,19 +352,26 @@ private void 
handleStartTaskExecutorServicesException(Exception e) throws Except
resourceManagerHeartbeatManager.stop();
 
try {
-   stopTaskExecutorServices();
+   CompletableFuture onStopFuture = 
stopTaskExecutorServices(failingTaskFutures);
+   log.info("Stopped TaskExecutor {}.", getAddress());
 
 Review comment:
   this log actually happens after `taskManagerMetricGroup.close()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor

2019-05-10 Thread GitBox
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers 
the termination of all running Tasks when shutting down TaskExecutor
URL: https://github.com/apache/flink/pull/7757#discussion_r282884262
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -335,9 +336,12 @@ private void 
handleStartTaskExecutorServicesException(Exception e) throws Except
resourceManagerConnection.close();
}
 
+   List> failingTaskFutures = new 
ArrayList<>();
for (JobManagerConnection jobManagerConnection : 
jobManagerConnections.values()) {
try {
-   
disassociateFromJobManager(jobManagerConnection, new FlinkException("The 
TaskExecutor is shutting down."));
+   FlinkException cause = new FlinkException("The 
TaskExecutor is shutting down.");
+   failTasks(failingTaskFutures, 
jobManagerConnection, cause);
 
 Review comment:
   I would rather make `failTasks` return `List>` 
instead of passing it as an argument
   and then:
   `failingTaskFutures.addAll(failTasks(jobManagerConnection, cause));`
   this way it is more explicit what is happening.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor

2019-05-10 Thread GitBox
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers 
the termination of all running Tasks when shutting down TaskExecutor
URL: https://github.com/apache/flink/pull/7757#discussion_r282882662
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -269,6 +272,9 @@
/** Initialized from the Flink configuration. May also be set at the 
ExecutionConfig */
private long taskCancellationTimeout;
 
+   /** Future which is completed once the Task#run method is completed */
+   private CompletableFuture taskRunFuture = new 
CompletableFuture<>();
 
 Review comment:
   Maybe, `taskCompletionFuture` is a bit better name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor

2019-05-10 Thread GitBox
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers 
the termination of all running Tasks when shutting down TaskExecutor
URL: https://github.com/apache/flink/pull/7757#discussion_r282888225
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -348,19 +352,26 @@ private void 
handleStartTaskExecutorServicesException(Exception e) throws Except
resourceManagerHeartbeatManager.stop();
 
try {
-   stopTaskExecutorServices();
+   CompletableFuture onStopFuture = 
stopTaskExecutorServices(failingTaskFutures);
+   log.info("Stopped TaskExecutor {}.", getAddress());
+   return onStopFuture;
} catch (Exception e) {
throwable = ExceptionUtils.firstOrSuppressed(e, 
throwable);
-   }
-
-   if (throwable != null) {
return FutureUtils.completedExceptionally(new 
FlinkException("Error while shutting the TaskExecutor down.", throwable));
-   } else {
-   log.info("Stopped TaskExecutor {}.", getAddress());
-   return CompletableFuture.completedFuture(null);
}
}
 
+   private void failTasks(List> failingTaskFutures,
+   JobManagerConnection jobManagerConnection,
+   Throwable cause) {
+   Iterator tasks = 
taskSlotTable.getTasks(jobManagerConnection.getJobID());
+   while (tasks.hasNext()) {
+   try {
+   
failingTaskFutures.add(tasks.next().failExternally(cause));
+   } catch (IllegalStateException ignored) {}
 
 Review comment:
   Why is the exception ignored? Should we not rather add it?
   `failingTaskFutures.add(new 
CompletableFuture<>().completeExceptionally(ignored))`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor

2019-05-10 Thread GitBox
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers 
the termination of all running Tasks when shutting down TaskExecutor
URL: https://github.com/apache/flink/pull/7757#discussion_r282882465
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
 ##
 @@ -1000,28 +1008,28 @@ public void cancelExecution() {
 * This method never blocks.
 */
@Override
-   public void failExternally(Throwable cause) {
+   public CompletableFuture failExternally(Throwable cause) {
 
 Review comment:
   If we add `public Task#getCompletionFuture()` method, we can avoid all 
changes in `failExternally/cancelOrFailAndCancelInvokable`. 
`failExternally/cancelOrFailAndCancelInvokable` can still be used to trigger 
the cancelation but we are interested in the task.run completion at the end 
which is a subsequent result of the cancelation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor

2019-05-10 Thread GitBox
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers 
the termination of all running Tasks when shutting down TaskExecutor
URL: https://github.com/apache/flink/pull/7757#discussion_r282884777
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -348,19 +352,26 @@ private void 
handleStartTaskExecutorServicesException(Exception e) throws Except
resourceManagerHeartbeatManager.stop();
 
try {
-   stopTaskExecutorServices();
+   CompletableFuture onStopFuture = 
stopTaskExecutorServices(failingTaskFutures);
+   log.info("Stopped TaskExecutor {}.", getAddress());
+   return onStopFuture;
 
 Review comment:
   If `throwable != null` at this point of `return onStopFuture;`, we ignore 
`throwable != null` which was not the case previously.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor

2019-05-10 Thread GitBox
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers 
the termination of all running Tasks when shutting down TaskExecutor
URL: https://github.com/apache/flink/pull/7757#discussion_r282896944
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ##
 @@ -396,6 +407,50 @@ private void stopTaskExecutorServices() throws Exception {
ExceptionUtils.tryRethrowException(exception);
}
 
+   private CompletableFuture stopTaskExecutorServices(
+   List> stoppingTaskFutures) 
throws Exception {
+   Exception exception = null;
+
+   try {
+   jobLeaderService.stop();
 
 Review comment:
   This looks like duplicated code with the previous method 
`stopTaskExecutorServices`.
   Can we split `stopTaskExecutorServices` into 2 methods and call them 
respectively in `stopTaskExecutorServices` and `onStop`? Then we do not need 
`stopTaskExecutorServices(stoppingTaskFutures)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on issue #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource

2019-05-10 Thread GitBox
dawidwys commented on issue #8387: [FLINK-11982] [table] File System 
Connector's support JSON Format and JSON file BatchTableSource
URL: https://github.com/apache/flink/pull/8387#issuecomment-491308505
 
 
   Ad. 1 Once again we aim to separate connector and formats => JsonFile = Json 
+ File
   Ad. 2  It provides `FileSystem` descriptor not connector. See my comments 
about `CsvTableSource`. It's design is broken and we do not want to repeat this 
error
   Ad. 3 Good to know, hive is not the source of truth though. Unfortunately we 
cannot accept general purpose code that works in just a single very narrow 
setup.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on issue #8359: [FLINK-11051][table] Add streaming window FlatAggregate to Table API

2019-05-10 Thread GitBox
hequn8128 commented on issue #8359: [FLINK-11051][table] Add streaming window 
FlatAggregate to Table API
URL: https://github.com/apache/flink/pull/8359#issuecomment-491306744
 
 
   @sunjincheng121 Thank you very much for your suggestions and meticulous 
review. I have addressed all your comments and updated the PR. Would be great 
if you can take another look. Thank you.
   
   Best, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database

2019-05-10 Thread GitBox
dawidwys commented on a change in pull request #8390: [FLINK-12469][table] 
Clean up catalog API on default/current database
URL: https://github.com/apache/flink/pull/8390#discussion_r282904007
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -65,9 +64,15 @@
private final Map> partitionColumnStats;
 
public GenericInMemoryCatalog(String name) {
+   this(name, DEFAULT_DB);
+   }
+
+   public GenericInMemoryCatalog(String name, String defaultDatabase) {
 
 Review comment:
   No, what I mean is to allow user to provide the defaultDB. In other words to 
have a ctor that does not create the defaultDb, but uses the one provided by 
the user.
   
   So the ctor would like:
   ```
public GenericInMemoryCatalog(String name, String defaultDbName, 
CatalogDatabase defaultDb) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
   
this.currentDatabase = defaultDbName;
this.catalogName = name;
this.databases = new LinkedHashMap<>();
this.databases.put(defaultDbName, defaultDb);
this.tables = new LinkedHashMap<>();
this.functions = new LinkedHashMap<>();
this.partitions = new LinkedHashMap<>();
this.tableStats = new LinkedHashMap<>();
this.tableColumnStats = new LinkedHashMap<>();
this.partitionStats = new LinkedHashMap<>();
this.partitionColumnStats = new LinkedHashMap<>();
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ambition119 edited a comment on issue #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource

2019-05-10 Thread GitBox
ambition119 edited a comment on issue #8387: [FLINK-11982] [table] File System 
Connector's support JSON Format and JSON file BatchTableSource
URL: https://github.com/apache/flink/pull/8387#issuecomment-491298616
 
 
   > Hi @ambition119
   > I agree with @StephanEwen that this PR mixes the concepts of 
`connector`(file) and `format`(json).
   > 
   > I don't necessarily understand the comment that `JsonRowFormatFactory` 
does not support File System Connector. First, there is no file system 
connector, and second of all this is a format factory thus it should have no 
notion of connector.
   > 
   > I think you could rework your `JsonBatchTableSource` and 
`JsonRowInputFormat` to accept any `DeserializationSchema` similar to how 
`org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase` works.
   > 
   > Another issue on a more conceptual layer is that the proposed 
`RowInputFormat` assumes that each line in a file is a separate record. I agree 
this is probably the most common case, but not the only one.
   > Files can be also written with a different layout (e.g. Parquet), thus we 
should differentiate that also on the connector level.
   > 
   > I'm guessing you might have been inspired by `CsvTableSource`, but it was 
one of the first `TableSource`s that was written before we decided to split 
connectors and formats and its design is flawed. It also uses 
`RowCsvInputFormat` that as I said in the previous paragraph applies custom 
block(in this case line) splitting based on configurable delimiter.
   
   Thank you for your reply and discussion. 
   
   First of all, the naming of this PR is a bit confusing. The code function 
mainly supports json files, such as local or hdfs, so I can rename it to 
**JsonFileXxx.**
   
   Second, flink does exist File System Connector, the official document is
   
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/connect.html#file-system-connector,
 provide 
[CsvBatchTableSourceFactory](https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory#L16)
  to support reading of csv files.
   
![image](https://user-images.githubusercontent.com/20353043/57529676-0fc6f180-7368-11e9-9828-bb1644013972.png)
   
   flink master show unit test info [FileSystem connector CSV 
Format](https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala#L59),
   and example:
   ```java
   public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
FileSystem connector = new 
FileSystem().path("/path/to/csv/in.csv");
   
OldCsv oldCsv = new OldCsv()
.field("a", Types.STRING())
.field("b", Types.INT())
.field("c", Types.BOOLEAN());
Schema schema = new Schema()
.field("a", Types.STRING())
.field("b", Types.INT())
.field("c", Types.BOOLEAN());
   
tEnv.connect(connector)
.withFormat(oldCsv)
.withSchema(schema)
.registerTableSource("csvTable");
   
String fullSql = "select * FROM csvTable";
Table fullTable = tEnv.sqlQuery(fullSql);
   
DataSet fullDS = tEnv.toDataSet(fullTable, Row.class);
List result = fullDS.collect();
   
System.out.println(result);
   }
   ```
   
   Third, for example, the hive support json file also is one line  a separate 
record, otherwise it is not supported. eg:
   ```json
   {
"fruit": [{
"weight": 8,
"type": "apple"
}, {
"weight": 9,
"type": "pear"
}],
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"category": "fiction",
"price": 22.99,
"isbn": "0-395-19395-8",
"email": "amy@only_for_json_udf_test.net",
"owner": "amy",
"zip_code": "94025",
"fb_id": "1234"
   }
   ```
   thanks.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r282898836
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID executionVertexId : 

[GitHub] [flink] yanghua commented on issue #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore

2019-05-10 Thread GitBox
yanghua commented on issue #8410: [FLINK-11159] Allow configuration whether to 
fall back to savepoints for restore
URL: https://github.com/apache/flink/pull/8410#issuecomment-491302415
 
 
   @gyfora If you have time, can you review this PR for me? thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r282898836
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID executionVertexId : 

[jira] [Assigned] (FLINK-12473) Add the interface of ML pipeline and ML lib

2019-05-10 Thread Shaoxuan Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shaoxuan Wang reassigned FLINK-12473:
-

Assignee: Luo Gen

> Add the interface of ML pipeline and ML lib
> ---
>
> Key: FLINK-12473
> URL: https://issues.apache.org/jira/browse/FLINK-12473
> Project: Flink
>  Issue Type: Sub-task
>  Components: Library / Machine Learning
>Reporter: Shaoxuan Wang
>Assignee: Luo Gen
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2019-05-10-12-50-15-869.png
>
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> This Jira will introduce the major interfaces for ML pipeline and ML lib.
> The major interfaces and their relationship diagram is shown as below. For 
> more details, please refer to [FLIP39 design 
> doc|[https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo]]
>  
> !image-2019-05-10-12-50-15-869.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r282892019
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+/**
+ * Strategy test utilities.
+ */
+public class StrategyTestUtil {
+
+   static void initVerticesAndPartitions(
+   TestingSchedulingTopology schedulingTopology,
+   ResultPartitionType[] partitionTypes,
+   InputDependencyConstraint[] inputDependencyConstraints,
+   TestingSchedulingExecutionVertex[][] vertices,
+   TestingSchedulingResultPartition[][] partitions) {
+
+   ResultPartitionID[][] resultPartitionIds = new 
ResultPartitionID[vertices.length - 1][vertices[0].length];
+   initVerticesAndPartitions(schedulingTopology, partitionTypes, 
inputDependencyConstraints,
+   vertices, resultPartitionIds, partitions);
+   }
+
+   static void initVerticesAndPartitions(
+   TestingSchedulingTopology schedulingTopology,
+   ResultPartitionType[] partitionTypes,
+   InputDependencyConstraint[] inputDependencyConstraints,
+   TestingSchedulingExecutionVertex[][] vertices,
+   ResultPartitionID[][] resultPartitionIds,
+   TestingSchedulingResultPartition[][] partitions) {
+
+   final int jobVertexCnt = vertices.length;
+   final int taskCnt = vertices[0].length;
+
+   JobVertexID[] jobVertexIds = new JobVertexID[jobVertexCnt];
+   IntermediateDataSetID[] dataSetIds = new 
IntermediateDataSetID[jobVertexCnt];
+   for (int i = 0; i < jobVertexCnt; i++) {
+   jobVertexIds[i] = new JobVertexID();
+   dataSetIds[i] = new IntermediateDataSetID();
+   
schedulingTopology.addInputDependencyConstraint(jobVertexIds[i], 
inputDependencyConstraints[i]);
+   }
+
+   for (int i = 0; i < jobVertexCnt; i++) {
+   for (int j = 0; j < taskCnt; j++) {
+   vertices[i][j] = new 
TestingSchedulingExecutionVertex(jobVertexIds[i], j);
+   
schedulingTopology.addSchedulingExecutionVertex(vertices[i][j]);
+
+   if (i != jobVertexCnt - 1) {
+   partitions[i][j] = new 
TestingSchedulingResultPartition(dataSetIds[i], new 
IntermediateResultPartitionID(),
+   partitionTypes[i], 
vertices[i][j]);
+   resultPartitionIds[i][j] = new 
ResultPartitionID(partitions[i][j].getId(), new ExecutionAttemptID());
+   
schedulingTopology.addResultPartition(partitions[i][j].getId(), 
partitions[i][j]);
 
 Review comment:
   The method `addResultPartition` should not exist. It should only be allowed 
to add new partitions to the topology via `SchedulingExecutionVertex`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] [flink] ambition119 commented on issue #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource

2019-05-10 Thread GitBox
ambition119 commented on issue #8387: [FLINK-11982] [table] File System 
Connector's support JSON Format and JSON file BatchTableSource
URL: https://github.com/apache/flink/pull/8387#issuecomment-491298616
 
 
   > Hi @ambition119
   > I agree with @StephanEwen that this PR mixes the concepts of 
`connector`(file) and `format`(json).
   > 
   > I don't necessarily understand the comment that `JsonRowFormatFactory` 
does not support File System Connector. First, there is no file system 
connector, and second of all this is a format factory thus it should have no 
notion of connector.
   > 
   > I think you could rework your `JsonBatchTableSource` and 
`JsonRowInputFormat` to accept any `DeserializationSchema` similar to how 
`org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase` works.
   > 
   > Another issue on a more conceptual layer is that the proposed 
`RowInputFormat` assumes that each line in a file is a separate record. I agree 
this is probably the most common case, but not the only one.
   > Files can be also written with a different layout (e.g. Parquet), thus we 
should differentiate that also on the connector level.
   > 
   > I'm guessing you might have been inspired by `CsvTableSource`, but it was 
one of the first `TableSource`s that was written before we decided to split 
connectors and formats and its design is flawed. It also uses 
`RowCsvInputFormat` that as I said in the previous paragraph applies custom 
block(in this case line) splitting based on configurable delimiter.
   
   Thank you for your reply and discussion. 
   
   First of all, the naming of this PR is a bit confusing. The code function 
mainly supports json files, such as local or hdfs, so you can rename it to 
JsonFileXxx.
   
   Second, flink does exist File System Connector, the official document is
   
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/connect.html#file-system-connector,
 provide 
[CsvBatchTableSourceFactory](https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory#L16)
  to support reading of csv files.
   
![image](https://user-images.githubusercontent.com/20353043/57529676-0fc6f180-7368-11e9-9828-bb1644013972.png)
   
   unit test [FileSystem connector CSV 
Format](https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala#L59)
   add example:
   ```java
   public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
FileSystem connector = new 
FileSystem().path("/path/to/csv/in.csv");
   
OldCsv oldCsv = new OldCsv()
.field("a", Types.STRING())
.field("b", Types.INT())
.field("c", Types.BOOLEAN());
Schema schema = new Schema()
.field("a", Types.STRING())
.field("b", Types.INT())
.field("c", Types.BOOLEAN());
   
tEnv.connect(connector)
.withFormat(oldCsv)
.withSchema(schema)
.registerTableSource("csvTable");
   
String fullSql = "select * FROM csvTable";
Table fullTable = tEnv.sqlQuery(fullSql);
   
DataSet fullDS = tEnv.toDataSet(fullTable, Row.class);
List result = fullDS.collect();
   
System.out.println(result);
   }
   ```
   
   Third, for example, the hive support json file also is one line  a separate 
record, otherwise it is not supported. eg:
   ```json
   {
"fruit": [{
"weight": 8,
"type": "apple"
}, {
"weight": 9,
"type": "pear"
}],
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"category": "fiction",
"price": 22.99,
"isbn": "0-395-19395-8",
"email": "amy@only_for_json_udf_test.net",
"owner": "amy",
"zip_code": "94025",
"fb_id": "1234"
   }
   ```
   thanks.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r282893011
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -280,6 +316,55 @@
 */
void sqlUpdate(String stmt, QueryConfig config);
 
+   /**
+* Gets the current default catalog name of the current session.
+*
+* @return the current default catalog that is used for path resolution
+* @see TableEnvironment#setCurrentCatalog(String)
+*/
+   String getCurrentCatalogName();
 
 Review comment:
   Nit: I wonder if we just name it getCurrentCatalog(), which returns the name 
of the current catalog. (Same for get current database name. I understand that 
the given name is more explicit, but getCurrentCatalog() is a mirror of the set 
method below and is more consistent with  catalog APIs in which, every get 
(table, database) is named getXxx() instead of getXxxName().


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r282892019
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+/**
+ * Strategy test utilities.
+ */
+public class StrategyTestUtil {
+
+   static void initVerticesAndPartitions(
+   TestingSchedulingTopology schedulingTopology,
+   ResultPartitionType[] partitionTypes,
+   InputDependencyConstraint[] inputDependencyConstraints,
+   TestingSchedulingExecutionVertex[][] vertices,
+   TestingSchedulingResultPartition[][] partitions) {
+
+   ResultPartitionID[][] resultPartitionIds = new 
ResultPartitionID[vertices.length - 1][vertices[0].length];
+   initVerticesAndPartitions(schedulingTopology, partitionTypes, 
inputDependencyConstraints,
+   vertices, resultPartitionIds, partitions);
+   }
+
+   static void initVerticesAndPartitions(
+   TestingSchedulingTopology schedulingTopology,
+   ResultPartitionType[] partitionTypes,
+   InputDependencyConstraint[] inputDependencyConstraints,
+   TestingSchedulingExecutionVertex[][] vertices,
+   ResultPartitionID[][] resultPartitionIds,
+   TestingSchedulingResultPartition[][] partitions) {
+
+   final int jobVertexCnt = vertices.length;
+   final int taskCnt = vertices[0].length;
+
+   JobVertexID[] jobVertexIds = new JobVertexID[jobVertexCnt];
+   IntermediateDataSetID[] dataSetIds = new 
IntermediateDataSetID[jobVertexCnt];
+   for (int i = 0; i < jobVertexCnt; i++) {
+   jobVertexIds[i] = new JobVertexID();
+   dataSetIds[i] = new IntermediateDataSetID();
+   
schedulingTopology.addInputDependencyConstraint(jobVertexIds[i], 
inputDependencyConstraints[i]);
+   }
+
+   for (int i = 0; i < jobVertexCnt; i++) {
+   for (int j = 0; j < taskCnt; j++) {
+   vertices[i][j] = new 
TestingSchedulingExecutionVertex(jobVertexIds[i], j);
+   
schedulingTopology.addSchedulingExecutionVertex(vertices[i][j]);
+
+   if (i != jobVertexCnt - 1) {
+   partitions[i][j] = new 
TestingSchedulingResultPartition(dataSetIds[i], new 
IntermediateResultPartitionID(),
+   partitionTypes[i], 
vertices[i][j]);
+   resultPartitionIds[i][j] = new 
ResultPartitionID(partitions[i][j].getId(), new ExecutionAttemptID());
+   
schedulingTopology.addResultPartition(partitions[i][j].getId(), 
partitions[i][j]);
 
 Review comment:
   The method `addResultPartition` should not exist. It should only be allowed 
to add new partitions to the topology via `SchedulingExecutionVertex`. 
Otherwise we risk being able to create _"impossible"_ topologies.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For 

[jira] [Commented] (FLINK-12428) Translate the "Event Time" page into Chinese

2019-05-10 Thread jack (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837302#comment-16837302
 ] 

jack commented on FLINK-12428:
--

Have you finished the task?

> Translate the "Event Time" page into Chinese
> 
>
> Key: FLINK-12428
> URL: https://issues.apache.org/jira/browse/FLINK-12428
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Affects Versions: 1.9.0
>Reporter: YangFei
>Assignee: jack
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> file locate  flink/docs/dev/event_time.zh.md
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r282887147
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.connectVerticesToPartition;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.initVerticesAndPartitions;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   final int jobVertexCnt = 2;
+   final int taskCnt = 3;
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+   TestingSchedulingExecutionVertex[][] vertices = new 
TestingSchedulingExecutionVertex[jobVertexCnt][taskCnt];
+   TestingSchedulingResultPartition[][] partitions = new 
TestingSchedulingResultPartition[jobVertexCnt - 1][taskCnt];
+
+   ResultPartitionType[] partitionTypes = new 
ResultPartitionType[jobVertexCnt - 1];
+   InputDependencyConstraint[] inputDependencyConstraints = new 
InputDependencyConstraint[jobVertexCnt];
+   partitionTypes[0] = BLOCKING;
+   inputDependencyConstraints[0] = ANY;
+   inputDependencyConstraints[1] = ANY;
+
+   initVerticesAndPartitions(schedulingTopology, partitionTypes, 
inputDependencyConstraints, vertices, partitions);
+
+   for (int i = 0; i < taskCnt; i++) {
+   for (int j = 0; j < taskCnt; j++) {
+   connectVerticesToPartition(vertices[0][i], 
vertices[1][i], partitions[0][j]);
+   }
+   }
+
+   TestingSchedulerOperations testingSchedulerOperation = new 
TestingSchedulerOperations();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology);
+
+   schedulingStrategy.startScheduling();
+
+   Set toBeScheduledVertices = 
Arrays.stream(vertices[0])
+   
.map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
+   Collection scheduledVertices = 
testingSchedulerOperation.getScheduledVertices().get(0);
+
+   
assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices),
+   containsInAnyOrder(toBeScheduledVertices.toArray()));
+   }
+
+   /**
+* Tests that when restart tasks 

[GitHub] [flink] flinkbot commented on issue #8414: [FLINK-12477][Build] Enforce maven version 3.1.1

2019-05-10 Thread GitBox
flinkbot commented on issue #8414: [FLINK-12477][Build] Enforce maven version 
3.1.1
URL: https://github.com/apache/flink/pull/8414#issuecomment-491294860
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12477) Change threading-model in StreamTask to a mailbox-based approach

2019-05-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12477:
---
Labels: pull-request-available  (was: )

> Change threading-model in StreamTask to a mailbox-based approach
> 
>
> Key: FLINK-12477
> URL: https://issues.apache.org/jira/browse/FLINK-12477
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Operators
>Reporter: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> This is the umbrella issue to change the threading-model in StreamTask to a 
> mailbox-based approach.
> You can find the corresponding design doc here:
> https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g
> Please comment and ask first before assigning sub-task. Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] rmetzger opened a new pull request #8414: [FLINK-12477][Build] Enforce maven version 3.1.1

2019-05-10 Thread GitBox
rmetzger opened a new pull request #8414: [FLINK-12477][Build] Enforce maven 
version 3.1.1
URL: https://github.com/apache/flink/pull/8414
 
 
   ## Contribution Checklist
   
   ## What is the purpose of the change
   
   The `frontend-maven-plugin` introduced in 
https://github.com/apache/flink/pull/8016 requires at least Maven 3.1.
   With this change, we enforce the the latest stable release of Maven 3.1.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] fhueske commented on a change in pull request #8354: [FLINK-12421][docs-zh] Synchronize the latest documentation changes (commits to 7b46f604) into Chinese documents

2019-05-10 Thread GitBox
fhueske commented on a change in pull request #8354: [FLINK-12421][docs-zh] 
Synchronize the latest documentation changes (commits to 7b46f604) into Chinese 
documents
URL: https://github.com/apache/flink/pull/8354#discussion_r282885332
 
 

 ##
 File path: docs/dev/connectors/kafka.zh.md
 ##
 @@ -88,7 +88,7 @@ For most users, the `FlinkKafkaConsumer08` (part of 
`flink-connector-kafka`) is
 >= 1.0.0
 
 This universal Kafka connector attempts to track the latest version of 
the Kafka client.
-The version of the client it uses may change between Flink releases.
+The version of the client it uses may change between Flink releases. 
As of this release, it uses the Kafka 2.2.0 client.
 
 Review comment:
   I think we should not reference `As of this release`. In 6 months nobody 
knows what this refers to.
   Could you replace that with the correct version (presumably 1.8) in both 
languages?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12428) Translate the "Event Time" page into Chinese

2019-05-10 Thread jack (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jack reassigned FLINK-12428:


Assignee: jack  (was: YangFei)

> Translate the "Event Time" page into Chinese
> 
>
> Key: FLINK-12428
> URL: https://issues.apache.org/jira/browse/FLINK-12428
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Affects Versions: 1.9.0
>Reporter: YangFei
>Assignee: jack
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> file locate  flink/docs/dev/event_time.zh.md
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r282887147
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.connectVerticesToPartition;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.initVerticesAndPartitions;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   final int jobVertexCnt = 2;
+   final int taskCnt = 3;
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+   TestingSchedulingExecutionVertex[][] vertices = new 
TestingSchedulingExecutionVertex[jobVertexCnt][taskCnt];
+   TestingSchedulingResultPartition[][] partitions = new 
TestingSchedulingResultPartition[jobVertexCnt - 1][taskCnt];
+
+   ResultPartitionType[] partitionTypes = new 
ResultPartitionType[jobVertexCnt - 1];
+   InputDependencyConstraint[] inputDependencyConstraints = new 
InputDependencyConstraint[jobVertexCnt];
+   partitionTypes[0] = BLOCKING;
+   inputDependencyConstraints[0] = ANY;
+   inputDependencyConstraints[1] = ANY;
+
+   initVerticesAndPartitions(schedulingTopology, partitionTypes, 
inputDependencyConstraints, vertices, partitions);
+
+   for (int i = 0; i < taskCnt; i++) {
+   for (int j = 0; j < taskCnt; j++) {
+   connectVerticesToPartition(vertices[0][i], 
vertices[1][i], partitions[0][j]);
+   }
+   }
+
+   TestingSchedulerOperations testingSchedulerOperation = new 
TestingSchedulerOperations();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology);
+
+   schedulingStrategy.startScheduling();
+
+   Set toBeScheduledVertices = 
Arrays.stream(vertices[0])
+   
.map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
+   Collection scheduledVertices = 
testingSchedulerOperation.getScheduledVertices().get(0);
+
+   
assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices),
+   containsInAnyOrder(toBeScheduledVertices.toArray()));
+   }
+
+   /**
+* Tests that when restart tasks 

[jira] [Created] (FLINK-12490) Introduce NetworkInput class

2019-05-10 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-12490:
--

 Summary: Introduce NetworkInput class
 Key: FLINK-12490
 URL: https://issues.apache.org/jira/browse/FLINK-12490
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.8.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski


NetworkInput will take care of records deserialisation and barrier alignment. 
It will be used by InputProcessors (One, TwoInput and TwoInputSelectable) to 
access buffers from InputGates. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] GJL closed pull request #8130: [FLINK-12053][runtime] Calculate job vertex parallelism based on metrics

2019-05-10 Thread GitBox
GJL closed pull request #8130: [FLINK-12053][runtime] Calculate job vertex 
parallelism based on metrics
URL: https://github.com/apache/flink/pull/8130
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   >