[jira] [Commented] (FLINK-12380) Add thread name in the log4j.properties
[ https://issues.apache.org/jira/browse/FLINK-12380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837766#comment-16837766 ] Yun Tang commented on FLINK-12380: -- [~azagrebin] If so why current [logbak.xml|https://github.com/apache/flink/blob/a7cf24383be9f310fb5ccc5a032721421fa45791/flink-dist/src/main/flink-bin/conf/logback.xml#L24] would already contain the thread name? I just replied Stephan in the closed PR with above confusion. > Add thread name in the log4j.properties > --- > > Key: FLINK-12380 > URL: https://issues.apache.org/jira/browse/FLINK-12380 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > This is inspired by FLINK-12368 when users want to add sub-task index > information in the source code. We could add thread name, which already > contains sub-task index information, in the logs to avoid have to change the > source code. > Moreover, I found existing {{logback.xml}} in Flink already contains {{thread > name}} information. We should also add this in the {{log4j.properties.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Myasuka commented on issue #8328: [FLINK-12380] Add thread name in the log4j.properties
Myasuka commented on issue #8328: [FLINK-12380] Add thread name in the log4j.properties URL: https://github.com/apache/flink/pull/8328#issuecomment-491482568 @StephanEwen As you are the founder of Flink and the most-contributed coder, I believe and respect your choice. However, your reply actually did not convince me. I struggled to hesitate whether to voice my thoughts. Since Apache Flink is an open source software, I believe every word counts. 1. > We kept it stable for a while now, which seems to have been generally appreciated by users. I am not sure about other users but we might have the largest Flink cluster in the world but add thread name explicitly in `log4j.properties`. 2. Actually [logback.xml](https://github.com/apache/flink/blob/a7cf24383be9f310fb5ccc5a032721421fa45791/flink-dist/src/main/flink-bin/conf/logback.xml#L24) already contains the thread name. Last but not least, if your choice is based on "Even though `log4j` and `logback` have different log pattern, but the log system for Flink works well so far, we should not touch this without any strong demand." I think this would be acceptable for me, and hope for no more demands come out to add sub-task index information in logs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuQianJin-Stars commented on issue #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime
XuQianJin-Stars commented on issue #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#issuecomment-491482291 hi @wuchong Thank you very much,Comments addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on issue #8317: [FLINK-12371] [table-planner-blink] Add support for converting (NOT) IN/ (NOT) EXISTS to semi-join/anti-join, and generating optimized logical plan
godfreyhe commented on issue #8317: [FLINK-12371] [table-planner-blink] Add support for converting (NOT) IN/ (NOT) EXISTS to semi-join/anti-join, and generating optimized logical plan URL: https://github.com/apache/flink/pull/8317#issuecomment-491481206 @KurtYoung I have fixed the failed test cases, the current failing is in `flink-yarn-tests` module. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283083182 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283082959 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; + +import java.util.Collection; +import java.util.stream.Collectors; + +/** + * Strategy test utilities. + */ +public class StrategyTestUtil { + + static void initVerticesAndPartitions( + TestingSchedulingTopology schedulingTopology, + ResultPartitionType[] partitionTypes, + InputDependencyConstraint[] inputDependencyConstraints, + TestingSchedulingExecutionVertex[][] vertices, + TestingSchedulingResultPartition[][] partitions) { + + ResultPartitionID[][] resultPartitionIds = new ResultPartitionID[vertices.length - 1][vertices[0].length]; + initVerticesAndPartitions(schedulingTopology, partitionTypes, inputDependencyConstraints, + vertices, resultPartitionIds, partitions); + } + + static void initVerticesAndPartitions( + TestingSchedulingTopology schedulingTopology, + ResultPartitionType[] partitionTypes, + InputDependencyConstraint[] inputDependencyConstraints, + TestingSchedulingExecutionVertex[][] vertices, + ResultPartitionID[][] resultPartitionIds, + TestingSchedulingResultPartition[][] partitions) { + + final int jobVertexCnt = vertices.length; + final int taskCnt = vertices[0].length; + + JobVertexID[] jobVertexIds = new JobVertexID[jobVertexCnt]; + IntermediateDataSetID[] dataSetIds = new IntermediateDataSetID[jobVertexCnt]; + for (int i = 0; i < jobVertexCnt; i++) { + jobVertexIds[i] = new JobVertexID(); + dataSetIds[i] = new IntermediateDataSetID(); + schedulingTopology.addInputDependencyConstraint(jobVertexIds[i], inputDependencyConstraints[i]); + } + + for (int i = 0; i < jobVertexCnt; i++) { + for (int j = 0; j < taskCnt; j++) { + vertices[i][j] = new TestingSchedulingExecutionVertex(jobVertexIds[i], j); + schedulingTopology.addSchedulingExecutionVertex(vertices[i][j]); + + if (i != jobVertexCnt - 1) { + partitions[i][j] = new TestingSchedulingResultPartition(dataSetIds[i], new IntermediateResultPartitionID(), + partitionTypes[i], vertices[i][j]); + resultPartitionIds[i][j] = new ResultPartitionID(partitions[i][j].getId(), new ExecutionAttemptID()); + schedulingTopology.addResultPartition(partitions[i][j].getId(), partitions[i][j]); Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283082955 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.api.common.InputDependencyConstraint.ANY; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.connectVerticesToPartition; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.initVerticesAndPartitions; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + final int jobVertexCnt = 2; + final int taskCnt = 3; + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + TestingSchedulingExecutionVertex[][] vertices = new TestingSchedulingExecutionVertex[jobVertexCnt][taskCnt]; + TestingSchedulingResultPartition[][] partitions = new TestingSchedulingResultPartition[jobVertexCnt - 1][taskCnt]; + + ResultPartitionType[] partitionTypes = new ResultPartitionType[jobVertexCnt - 1]; + InputDependencyConstraint[] inputDependencyConstraints = new InputDependencyConstraint[jobVertexCnt]; + partitionTypes[0] = BLOCKING; + inputDependencyConstraints[0] = ANY; + inputDependencyConstraints[1] = ANY; + + initVerticesAndPartitions(schedulingTopology, partitionTypes, inputDependencyConstraints, vertices, partitions); + + for (int i = 0; i < taskCnt; i++) { + for (int j = 0; j < taskCnt; j++) { + connectVerticesToPartition(vertices[0][i], vertices[1][i], partitions[0][j]); + } + } + + TestingSchedulerOperations testingSchedulerOperation = new TestingSchedulerOperations(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology); + + schedulingStrategy.startScheduling(); + + Set toBeScheduledVertices = Arrays.stream(vertices[0]) + .map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet()); + Collection scheduledVertices = testingSchedulerOperation.getScheduledVertices().get(0); + + assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices), + containsInAnyOrder(toBeScheduledVertices.toArray())); + } + + /** +* Tests that when
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283082635 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283082502 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283082509 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID
[jira] [Commented] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints
[ https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837692#comment-16837692 ] Stephan Ewen commented on FLINK-8513: - Fixed in {{master}} in 4dae94d952958858af722c6285398bc6402708e4 > Add documentation for connecting to non-AWS S3 endpoints > > > Key: FLINK-8513 > URL: https://issues.apache.org/jira/browse/FLINK-8513 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, Documentation >Reporter: chris snow >Assignee: Seth Wiesman >Priority: Trivial > > It would be useful if the documentation provided information on connecting to > non-AWS S3 endpoints when using presto. For example: > > > You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's > {{flink-conf.yaml}}: > {code:java} > s3.access-key: your-access-key > s3.secret-key: your-secret-key{code} > If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object > Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 > endpoint in Flink's {{flink-conf.yaml}}: > {code:java} > s3.endpoint: your-endpoint-hostname{code} > > > Source: > [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10249) Document hadoop/presto s3 file system configuration forwarding
[ https://issues.apache.org/jira/browse/FLINK-10249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837691#comment-16837691 ] Stephan Ewen commented on FLINK-10249: -- Fixed in {{master}} in 7d45b756b8bccbbacf6b7b9ca50a7ede8ed01a5c > Document hadoop/presto s3 file system configuration forwarding > -- > > Key: FLINK-10249 > URL: https://issues.apache.org/jira/browse/FLINK-10249 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, Documentation >Reporter: Andrey Zagrebin >Assignee: Seth Wiesman >Priority: Minor > > Flink hadoop and presto s3 file system factories (S3FileSystemFactory) use > HadoopConfigLoader which automatically converts and prefixes s3.* config > options to configure underlying s3 clients. We can leave at least a hint > about this behaviour for users who want to change config of these underlying > s3 clients, e.g. in docs/ops/deployment/aws.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12388) Update the production readiness checklist
[ https://issues.apache.org/jira/browse/FLINK-12388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837690#comment-16837690 ] Stephan Ewen commented on FLINK-12388: -- Fixed in {{master}} in 754cd71dd92dfcfff3e1ef23083790422188ce9e > Update the production readiness checklist > -- > > Key: FLINK-12388 > URL: https://issues.apache.org/jira/browse/FLINK-12388 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > The production readiness checklist has grown organically over the years, and > while it provides valuable information, the content does not flow cohesively > as it has been worked on by a number of users. > We should improve the overall structure and readability of the checklist and > also update any outdated information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-12070. > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Till Rohrmann >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: image-2019-04-18-17-38-24-949.png > > Time Spent: 20m > Remaining Estimate: 0h > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12378) Consolidate FileSystem Documentation
[ https://issues.apache.org/jira/browse/FLINK-12378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837694#comment-16837694 ] Stephan Ewen commented on FLINK-12378: -- Fixed in {{master}} in b50896a55288b363fd369368c0823abbe99aa36c > Consolidate FileSystem Documentation > > > Key: FLINK-12378 > URL: https://issues.apache.org/jira/browse/FLINK-12378 > Project: Flink > Issue Type: Improvement > Components: Documentation, FileSystems >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently flink's filesystem documentation is spread across a number of pages > without any clear connection. A non-exhaustive list of issues includes: > * S3 documentation spread across many pages > * OSS filesystem is listed under deployments when it is an object store > * deployments/filesystem.md has a lot of unrelated information > We should create a filesystem subsection under deployments with multiple > pages containing all relevant information about Flink's filesystem > abstraction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12378) Consolidate FileSystem Documentation
[ https://issues.apache.org/jira/browse/FLINK-12378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-12378: - Fix Version/s: 1.8.1 1.9.0 > Consolidate FileSystem Documentation > > > Key: FLINK-12378 > URL: https://issues.apache.org/jira/browse/FLINK-12378 > Project: Flink > Issue Type: Improvement > Components: Documentation, FileSystems >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently flink's filesystem documentation is spread across a number of pages > without any clear connection. A non-exhaustive list of issues includes: > * S3 documentation spread across many pages > * OSS filesystem is listed under deployments when it is an object store > * deployments/filesystem.md has a lot of unrelated information > We should create a filesystem subsection under deployments with multiple > pages containing all relevant information about Flink's filesystem > abstraction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8513) Add documentation for connecting to non-AWS S3 endpoints
[ https://issues.apache.org/jira/browse/FLINK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-8513: Fix Version/s: 1.8.1 1.9.0 > Add documentation for connecting to non-AWS S3 endpoints > > > Key: FLINK-8513 > URL: https://issues.apache.org/jira/browse/FLINK-8513 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, Documentation >Reporter: chris snow >Assignee: Seth Wiesman >Priority: Trivial > Fix For: 1.9.0, 1.8.1 > > > It would be useful if the documentation provided information on connecting to > non-AWS S3 endpoints when using presto. For example: > > > You need to configure both {{s3.access-key}} and {{s3.secret-key}} in Flink's > {{flink-conf.yaml}}: > {code:java} > s3.access-key: your-access-key > s3.secret-key: your-secret-key{code} > If you are using a non-AWS S3 endpoint (such as [IBM's Cloud Object > Storage|https://www.ibm.com/cloud/object-storage]), you can configure the S3 > endpoint in Flink's {{flink-conf.yaml}}: > {code:java} > s3.endpoint: your-endpoint-hostname{code} > > > Source: > [https://github.com/apache/flink/blob/master/docs/ops/deployment/aws.md] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-12070) Make blocking result partitions consumable multiple times
[ https://issues.apache.org/jira/browse/FLINK-12070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-12070. -- Resolution: Implemented Assignee: Stephan Ewen (was: BoWang) Fix Version/s: 1.9.0 Implemented in a7cf24383be9f310fb5ccc5a032721421fa45791 > Make blocking result partitions consumable multiple times > - > > Key: FLINK-12070 > URL: https://issues.apache.org/jira/browse/FLINK-12070 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Till Rohrmann >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: image-2019-04-18-17-38-24-949.png > > Time Spent: 20m > Remaining Estimate: 0h > > In order to avoid writing produced results multiple times for multiple > consumers and in order to speed up batch recoveries, we should make the > blocking result partitions to be consumable multiple times. At the moment a > blocking result partition will be released once the consumers has processed > all data. Instead the result partition should be released once the next > blocking result has been produced and all consumers of a blocking result > partition have terminated. Moreover, blocking results should not hold on slot > resources like network buffers or memory as it is currently the case with > {{SpillableSubpartitions}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12378) Consolidate FileSystem Documentation
[ https://issues.apache.org/jira/browse/FLINK-12378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12378: --- Labels: pull-request-available (was: ) > Consolidate FileSystem Documentation > > > Key: FLINK-12378 > URL: https://issues.apache.org/jira/browse/FLINK-12378 > Project: Flink > Issue Type: Improvement > Components: Documentation, FileSystems >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > > Currently flink's filesystem documentation is spread across a number of pages > without any clear connection. A non-exhaustive list of issues includes: > * S3 documentation spread across many pages > * OSS filesystem is listed under deployments when it is an object store > * deployments/filesystem.md has a lot of unrelated information > We should create a filesystem subsection under deployments with multiple > pages containing all relevant information about Flink's filesystem > abstraction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8326: [FLINK-12378][docs] Consolidate FileSystem Documentation
asfgit closed pull request #8326: [FLINK-12378][docs] Consolidate FileSystem Documentation URL: https://github.com/apache/flink/pull/8326 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions
StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions URL: https://github.com/apache/flink/pull/8290#issuecomment-491457613 Manually merged in a7cf24383be9f310fb5ccc5a032721421fa45791 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8330: [FLINK-12388][docs] Update the production readiness checklist
asfgit closed pull request #8330: [FLINK-12388][docs] Update the production readiness checklist URL: https://github.com/apache/flink/pull/8330 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen closed pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions
StephanEwen closed pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions URL: https://github.com/apache/flink/pull/8290 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283063574 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.table.api.ExternalCatalogAlreadyExistException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.operations.CatalogTableOperation; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager that encapsulates all available catalogs. It also implements the logic of + * table path resolution. Supports both new API ({@link ReadableCatalog} as well as {@link ExternalCatalog}. + */ +@Internal +public class CatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class); + + // A map between names and catalogs. + private Map catalogs; + + // TO BE REMOVED along with ExternalCatalog API + private Map externalCatalogs; + + // The name of the default catalog and schema + private String currentCatalogName; + + private String currentDatabaseName; + + public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) { + catalogs = new LinkedHashMap<>(); + externalCatalogs = new LinkedHashMap<>(); + catalogs.put(defaultCatalogName, defaultCatalog); + this.currentCatalogName = defaultCatalogName; + this.currentDatabaseName = defaultCatalog.getCurrentDatabase(); + } + + /** +* Registers a catalog under the given name. The catalog name must be unique across both +* {@link Catalog}s and {@link ExternalCatalog}s. +* +* @param catalogName name under which to register the given catalog +* @param catalog catalog to register +* @throws CatalogAlreadyExistsException thrown if the name is already taken +*/ + public void registerCatalog(String catalogName, Catalog catalog) throws CatalogAlreadyExistsException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); + checkNotNull(catalog, "Catalog cannot be null"); + + if (catalogs.containsKey(catalogName) || externalCatalogs.containsKey(catalogName)) { + throw new CatalogAlreadyExistsException(catalogName); + } + + catalogs.put(catalogName, catalog); + catalog.open(); + } + + /** +* Gets a catalog by name. +* +* @param catalogName name of the catalog to retrieve +* @return the requested catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +* @see CatalogManager#getExternalCatalog(String) +*/ + public Catalog getCatalog(String catalogName) throws CatalogNotExistException { + if (!catalogs.keySet().contains(catalogName)) { + throw new CatalogNotExistException(catalogName); + } + + return catalogs.get(catalogName); + } + + /** +* Registers an external catalog under the given
[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283062024 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.table.api.ExternalCatalogAlreadyExistException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.operations.CatalogTableOperation; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager that encapsulates all available catalogs. It also implements the logic of + * table path resolution. Supports both new API ({@link ReadableCatalog} as well as {@link ExternalCatalog}. Review comment: "Catalog", not "ReadableCatalog" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283060689 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -51,7 +51,7 @@ public static final String DEFAULT_DB = "default"; - private String currentDatabase = DEFAULT_DB; + private String currentDatabase; Review comment: can you rebase the in-memory catalog to https://github.com/apache/flink/pull/8390 ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283062301 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.table.api.ExternalCatalogAlreadyExistException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.operations.CatalogTableOperation; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager that encapsulates all available catalogs. It also implements the logic of + * table path resolution. Supports both new API ({@link ReadableCatalog} as well as {@link ExternalCatalog}. + */ +@Internal +public class CatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class); + + // A map between names and catalogs. + private Map catalogs; + + // TO BE REMOVED along with ExternalCatalog API + private Map externalCatalogs; + + // The name of the default catalog and schema + private String currentCatalogName; + + private String currentDatabaseName; + + public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) { Review comment: validate defaultCatalogName and defaultCatalog? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283062652 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.table.api.ExternalCatalogAlreadyExistException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.operations.CatalogTableOperation; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager that encapsulates all available catalogs. It also implements the logic of + * table path resolution. Supports both new API ({@link ReadableCatalog} as well as {@link ExternalCatalog}. + */ +@Internal +public class CatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class); + + // A map between names and catalogs. + private Map catalogs; + + // TO BE REMOVED along with ExternalCatalog API + private Map externalCatalogs; + + // The name of the default catalog and schema + private String currentCatalogName; + + private String currentDatabaseName; + + public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) { + catalogs = new LinkedHashMap<>(); + externalCatalogs = new LinkedHashMap<>(); + catalogs.put(defaultCatalogName, defaultCatalog); + this.currentCatalogName = defaultCatalogName; + this.currentDatabaseName = defaultCatalog.getCurrentDatabase(); + } + + /** +* Registers a catalog under the given name. The catalog name must be unique across both +* {@link Catalog}s and {@link ExternalCatalog}s. +* +* @param catalogName name under which to register the given catalog +* @param catalog catalog to register +* @throws CatalogAlreadyExistsException thrown if the name is already taken +*/ + public void registerCatalog(String catalogName, Catalog catalog) throws CatalogAlreadyExistsException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); + checkNotNull(catalog, "Catalog cannot be null"); + + if (catalogs.containsKey(catalogName) || externalCatalogs.containsKey(catalogName)) { + throw new CatalogAlreadyExistsException(catalogName); + } + + catalogs.put(catalogName, catalog); + catalog.open(); + } + + /** +* Gets a catalog by name. +* +* @param catalogName name of the catalog to retrieve +* @return the requested catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +* @see CatalogManager#getExternalCatalog(String) +*/ + public Catalog getCatalog(String catalogName) throws CatalogNotExistException { + if (!catalogs.keySet().contains(catalogName)) { + throw new CatalogNotExistException(catalogName); + } + + return catalogs.get(catalogName); + } + + /** +* Registers an external catalog under the given
[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283059466 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ## @@ -55,19 +57,46 @@ * Registers an {@link ExternalCatalog} under a unique name in the TableEnvironment's schema. * All tables registered in the {@link ExternalCatalog} can be accessed. * -* @param nameThe name under which the externalCatalog will be registered +* @param name The name under which the externalCatalog will be registered * @param externalCatalog The externalCatalog to register +* @see TableEnvironment#getCatalog(String) +* @see TableEnvironment#registerCatalog(String, Catalog) +* @deprecated the {@link ExternalCatalog} API is deprecated. Use the corresponding {@link Catalog} API. */ + @Deprecated void registerExternalCatalog(String name, ExternalCatalog externalCatalog); /** * Gets a registered {@link ExternalCatalog} by name. * * @param name The name to look up the {@link ExternalCatalog} * @return The {@link ExternalCatalog} +* @see TableEnvironment#getCatalog(String) +* @see TableEnvironment#registerCatalog(String, Catalog) +* @deprecated the {@link ExternalCatalog} API is deprecated. Use the corresponding {@link Catalog} API. */ + @Deprecated ExternalCatalog getRegisteredExternalCatalog(String name); + /** +* Registers a {@link Catalog} under a unique name. +* All tables registered in the {@link Catalog} can be accessed. +* +* @param name the name under which the catalog will be registered +* @param catalog the catalog to register +* @throws CatalogAlreadyExistsException thrown if catalog with given name already exists +*/ + void registerCatalog(String name, Catalog catalog) throws CatalogAlreadyExistsException; + + /** +* Gets a registered {@link Catalog} by name. +* +* @param catalogName The name to look up the {@link Catalog} Review comment: nit: "**the** name ..." This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283063171 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.table.api.ExternalCatalogAlreadyExistException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.operations.CatalogTableOperation; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager that encapsulates all available catalogs. It also implements the logic of + * table path resolution. Supports both new API ({@link ReadableCatalog} as well as {@link ExternalCatalog}. + */ +@Internal +public class CatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class); + + // A map between names and catalogs. + private Map catalogs; + + // TO BE REMOVED along with ExternalCatalog API + private Map externalCatalogs; + + // The name of the default catalog and schema + private String currentCatalogName; + + private String currentDatabaseName; + + public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) { + catalogs = new LinkedHashMap<>(); + externalCatalogs = new LinkedHashMap<>(); + catalogs.put(defaultCatalogName, defaultCatalog); + this.currentCatalogName = defaultCatalogName; + this.currentDatabaseName = defaultCatalog.getCurrentDatabase(); + } + + /** +* Registers a catalog under the given name. The catalog name must be unique across both +* {@link Catalog}s and {@link ExternalCatalog}s. +* +* @param catalogName name under which to register the given catalog +* @param catalog catalog to register +* @throws CatalogAlreadyExistsException thrown if the name is already taken +*/ + public void registerCatalog(String catalogName, Catalog catalog) throws CatalogAlreadyExistsException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); + checkNotNull(catalog, "Catalog cannot be null"); + + if (catalogs.containsKey(catalogName) || externalCatalogs.containsKey(catalogName)) { + throw new CatalogAlreadyExistsException(catalogName); + } + + catalogs.put(catalogName, catalog); + catalog.open(); + } + + /** +* Gets a catalog by name. +* +* @param catalogName name of the catalog to retrieve +* @return the requested catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +* @see CatalogManager#getExternalCatalog(String) +*/ + public Catalog getCatalog(String catalogName) throws CatalogNotExistException { + if (!catalogs.keySet().contains(catalogName)) { + throw new CatalogNotExistException(catalogName); + } + + return catalogs.get(catalogName); + } + + /** +* Registers an external catalog under the given
[GitHub] [flink] bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
bowenli86 commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283061626 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * A mapping between Flink's catalog and Calcite's schema. This enables to look up and access tables Review comment: "...access tables **and views** in SQL queries without registering **them** in advance"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8390: [FLINK-12469][table] Clean up catalog API on default/current database
bowenli86 commented on issue #8390: [FLINK-12469][table] Clean up catalog API on default/current database URL: https://github.com/apache/flink/pull/8390#issuecomment-491445937 +1 @dawidwys can you take another look? if you don't have other comments, I'll merge this once CI is green This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r283056348 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + protected abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) Review comment: Make sense. Though IllegalArgumentException can be thrown when any part of the input table is invalid, a better signature may just be throwing CatalogException to conform with that of other existing catalog APIs, since we've defined CatalogException's use case as "in case of any runtime exception". I don't think returning boolean is a good signature as the check can be fairly complicated and the caller won't know which part is invalid. Returning boolean would be a good option for a single criteria check like "bool isTable()". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database
xuefuz commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database URL: https://github.com/apache/flink/pull/8390#discussion_r283047530 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -48,23 +48,27 @@ */ public abstract class HiveCatalogBase implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogBase.class); - - public static final String DEFAULT_DB = "default"; + private static final String DEFAULT_DB = "default"; protected final String catalogName; protected final HiveConf hiveConf; - protected String currentDatabase = DEFAULT_DB; + private final String defaultDatabase; protected IMetaStoreClient client; public HiveCatalogBase(String catalogName, String hivemetastoreURI) { - this(catalogName, getHiveConf(hivemetastoreURI)); + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalogBase(String catalogName, HiveConf hiveConf) { + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalogBase(String catalogName, String defaultDatabase, HiveConf hiveConf) { checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); this.catalogName = catalogName; - + this.defaultDatabase = checkNotNull(defaultDatabase, "defaultDatabase cannot be null"); Review comment: Adopted suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r283039874 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -363,4 +332,23 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath throw new UnsupportedOperationException(); } + // -- utils -- + + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private Map retrieveFlinkProperties(Map hiveTableParams) { Review comment: make sense to me. Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database
bowenli86 commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database URL: https://github.com/apache/flink/pull/8390#discussion_r283037447 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -48,23 +48,27 @@ */ public abstract class HiveCatalogBase implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogBase.class); - - public static final String DEFAULT_DB = "default"; + private static final String DEFAULT_DB = "default"; protected final String catalogName; protected final HiveConf hiveConf; - protected String currentDatabase = DEFAULT_DB; + private final String defaultDatabase; protected IMetaStoreClient client; public HiveCatalogBase(String catalogName, String hivemetastoreURI) { - this(catalogName, getHiveConf(hivemetastoreURI)); + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalogBase(String catalogName, HiveConf hiveConf) { + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalogBase(String catalogName, String defaultDatabase, HiveConf hiveConf) { checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); this.catalogName = catalogName; - + this.defaultDatabase = checkNotNull(defaultDatabase, "defaultDatabase cannot be null"); Review comment: Since we already have `!StringUtils.isNullOrWhitespaceOnly(defaultDatabase)` above, shall we remove the `checkNotNull`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database
bowenli86 commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database URL: https://github.com/apache/flink/pull/8390#discussion_r283036856 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -65,9 +64,15 @@ private final Map> partitionColumnStats; public GenericInMemoryCatalog(String name) { + this(name, DEFAULT_DB); + } + + public GenericInMemoryCatalog(String name, String defaultDatabase) { Review comment: +1, I like Xuefu's approach as it conforms more to yaml config. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#issuecomment-491418905 @flinkbot attention @zhijiangW This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
flinkbot commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#issuecomment-491418683 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin opened a new pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment
azagrebin opened a new pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416 ## What is the purpose of the change Introduce partition/gate setup to decouple task registration with NetworkEnvironment. The PR also does some preparation refactoring. ## Brief change log - Introduce factories for ResultPartion and SingleInputgates - Introduce buffer pool factories for ResultPartion and SingleInputgates - Introduce MemorySegmentProvider for RemoteInputChannel to assign exclusive segments - Refactor NetworkEnvironment#setupXXX() to ResultPartitionWriter#setup() and InputGate#setup() ## Verifying this change existing unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283025643 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ## @@ -40,63 +40,67 @@ object ExternalTableUtil extends Logging { * @param externalTable the [[ExternalCatalogTable]] instance which to convert * @return converted [[TableSourceTable]] instance from the input catalog table */ - def fromExternalCatalogTable[T1, T2]( - tableEnv: TableEnvironment, - externalTable: ExternalCatalogTable) + def fromExternalCatalogTable[T1, T2](isBatch: Boolean, externalTable: ExternalCatalogTable) : TableSourceSinkTable[T1, T2] = { val statistics = new FlinkStatistic(toScala(externalTable.getTableStats)) val source: Option[TableSourceTable[T1]] = if (externalTable.isTableSource) { - Some(createTableSource(tableEnv, externalTable, statistics)) + Some(createTableSource(isBatch, externalTable, statistics)) } else { None } val sink: Option[TableSinkTable[T2]] = if (externalTable.isTableSink) { - Some(createTableSink(tableEnv, externalTable, statistics)) + Some(createTableSink(isBatch, externalTable, statistics)) } else { None } new TableSourceSinkTable[T1, T2](source, sink) } + def getTableSchema(externalTable: ExternalCatalogTable) : TableSchema = { +if (externalTable.isTableSource) { + TableFactoryUtil.findAndCreateTableSource[Any](externalTable).getTableSchema +} else { + val tableSink = TableFactoryUtil.findAndCreateTableSink(externalTable) + new TableSchema(tableSink.getFieldNames, tableSink.getFieldTypes) +} + } + private def createTableSource[T]( - tableEnv: TableEnvironment, + isBatch: Boolean, externalTable: ExternalCatalogTable, statistics: FlinkStatistic) -: TableSourceTable[T] = tableEnv match { - -case _: BatchTableEnvImpl if externalTable.isBatchTable => +: TableSourceTable[T] = { +if (isBatch && externalTable.isBatchTable) { val source = TableFactoryUtil.findAndCreateTableSource(externalTable) new BatchTableSourceTable[T](source.asInstanceOf[BatchTableSource[T]], statistics) - -case _: StreamTableEnvImpl if externalTable.isStreamTable => +} else if (!isBatch && externalTable.isStreamTable) { val source = TableFactoryUtil.findAndCreateTableSource(externalTable) new StreamTableSourceTable[T](source.asInstanceOf[StreamTableSource[T]], statistics) - -case _ => +} else { throw new ValidationException( "External catalog table does not support the current environment for a table source.") Review comment: This message might need to change because it covers two cases: isBatch==true && externalTable.isStreamTable, and isBatch==false && externalTable.isBatchTable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10520) Job save points REST API fails unless parameters are specified
[ https://issues.apache.org/jira/browse/FLINK-10520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837525#comment-16837525 ] Tim commented on FLINK-10520: - Is there any update on this issue? > Job save points REST API fails unless parameters are specified > -- > > Key: FLINK-10520 > URL: https://issues.apache.org/jira/browse/FLINK-10520 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Affects Versions: 1.6.1 >Reporter: Elias Levy >Assignee: Chesnay Schepler >Priority: Minor > > The new REST API POST endpoint, {{/jobs/:jobid/savepoints}}, returns an error > unless the request includes a body with all parameters ({{target-directory}} > and {{cancel-job}})), even thought the > [documentation|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html] > suggests these are optional. > If a POST request with no data is made, the response is a 400 status code > with the error message "Bad request received." > If the POST request submits an empty JSON object ( {} ), the response is a > 400 status code with the error message "Request did not match expected format > SavepointTriggerRequestBody." The same is true if only the > {{target-directory}} or {{cancel-job}} parameters are included. > As the system is configured with a default savepoint location, there > shouldn't be a need to include the parameter in the quest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7697) Add metrics for Elasticsearch Sink
[ https://issues.apache.org/jira/browse/FLINK-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837512#comment-16837512 ] Piyush Goyal commented on FLINK-7697: - +1 on adding metrics to ES Sink. Few others metrics that would be useful. * Number of failed index equests (tagged for failure reasons). * End-to-End latency for bulk request. Is anyone working on it currently ? We(at Netflix) are starting to add these metrics to our private fork. If no one else has started working on it, we'll be happy to contribute it. > Add metrics for Elasticsearch Sink > -- > > Key: FLINK-7697 > URL: https://issues.apache.org/jira/browse/FLINK-7697 > Project: Flink > Issue Type: Wish > Components: Connectors / ElasticSearch >Reporter: Hai Zhou >Assignee: xueyu >Priority: Major > > We should add metrics to track events write to ElasticasearchSink. > eg. > * number of successful bulk sends > * number of documents inserted > * number of documents updated > * number of documents version conflicts -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r282981545 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * A mapping between Flink's catalog and Calcite's schema. This enables to look up and access tables + * in SQL queries without registering tables in advance. Databases are registered as sub-schemas in the schema. + */ +@Internal +public class CatalogCalciteSchema implements Schema { + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogCalciteSchema.class); + + private final String catalogName; + private final Catalog catalog; + + public CatalogCalciteSchema(String catalogName, Catalog catalog) { + this.catalogName = catalogName; + this.catalog = catalog; + } + + /** +* Look up a sub-schema (database) by the given sub-schema name. +* +* @param schemaName name of sub-schema to look up +* @return the sub-schema with a given dbName, or null +*/ + @Override + public Schema getSubSchema(String schemaName) { + + if (catalog.databaseExists(schemaName)) { + return new DatabaseCalciteSchema(schemaName, catalog); + } else { + LOGGER.error(String.format("Schema %s does not exist in catalog %s", schemaName, catalogName)); + throw new CatalogException(new DatabaseNotExistException(catalogName, schemaName)); + } + } + + @Override + public Set getSubSchemaNames() { + return new HashSet<>(catalog.listDatabases()); + } + + @Override + public Table getTable(String name) { + return null; + } + + @Override + public Set getTableNames() { + return new HashSet<>(); + } + + @Override + public RelProtoDataType getType(String name) { + return null; Review comment: Returning "null" means no type with the given name is found. I'm not sure if that's desired, but feel free to add an TODO item for this if necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yuyang08 closed pull request #8067: [FLINK-11746][formats] Add thrift format support to Flink
yuyang08 closed pull request #8067: [FLINK-11746][formats] Add thrift format support to Flink URL: https://github.com/apache/flink/pull/8067 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions
StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions URL: https://github.com/apache/flink/pull/8290#issuecomment-491373523 @zhijiangW Our reasoning about the three options seems similar. My understanding is that even if CancelPartition is not reliable, it can only fail when the TCP connection fails. In that case, all associated SubpartitionViews are released immediately, which should work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions
StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions URL: https://github.com/apache/flink/pull/8290#discussion_r282978232 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The reader (read view) of a BoundedBlockingSubpartition. + */ +class BoundedBlockingSubpartitionReader implements ResultSubpartitionView { + + /** The result subpartition that we read. */ + private final BoundedBlockingSubpartition parent; + + /** Further byte buffers, to handle cases where there is more data than fits into +* one mapped byte buffer (2GB = Integer.MAX_VALUE). */ + private final Iterator furtherData; + + /** The reader/decoder to the memory mapped region with the data we currently read from. +* Max 2GB large. Further regions may be in the {@link #furtherData} field. +* Null once the reader empty or disposed.*/ + @Nullable + private BoundedBlockingSubpartitionMemory.Reader data; + + /** The next buffer (look ahead). Null once the data is depleted or reader is disposed. */ + @Nullable + private Buffer nextBuffer; + + /** The remaining number of data buffers (not events) in the result. */ + private int bufferBacklog; + + /** Flag whether this reader is released. Atomic, to avoid double release. */ + private boolean isReleased; + + /** +* Convenience constructor that takes a single buffer. +*/ + BoundedBlockingSubpartitionReader( Review comment: have removed it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions
StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions URL: https://github.com/apache/flink/pull/8290#issuecomment-491372403 Thank you for the reviews. I refactored some code: - Pulled out the `MemoryMappedBuffers`) class to better test `size()` and region spanning writes and avoid leaking the region configuration into the `BoundedBlockingSubpartition`. - Added comments to make the treading model assumptions more clear - Addressed comments on typos, etc. The code staged to be merged is in https://github.com/StephanEwen/incubator-flink/commit/a7cf24383be9f310fb5ccc5a032721421fa45791 I think overall, this is now in a really good shape. Waiting for Travis to pass, will then merge this. Followups are to to actually make use of the "multiple read" functionality. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r282976905 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.calcite.FlinkTypeFactory; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.schema.Table; + +import java.util.Map; +import java.util.Optional; + +/** + * Thin wrapper around Calcite specific {@link Table}, this is a temporary solution + * that allows to register those tables in the {@link CatalogManager}. + * TODO remove once we decouple TableEnvironment from Calcite. + */ +@Internal +public class CalciteCatalogTable implements CatalogBaseTable { + private final Table table; + private final FlinkTypeFactory typeFactory; + + public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) { + this.table = table; + this.typeFactory = typeFactory; + } + + public Table getTable() { + return table; + } + + @Override + public Map getProperties() { + throw new UnsupportedOperationException("Calcite table cannot be expressed as a map of properties."); + } + + @Override + public TableSchema getSchema() { + RelDataType relDataType = table.getRowType(typeFactory); + + String[] fieldNames = relDataType.getFieldNames().toArray(new String[0]); + TypeInformation[] fieldTypes = relDataType.getFieldList() + .stream() + .map(field -> FlinkTypeFactory.toTypeInfo(field.getType())).toArray(TypeInformation[]::new); + + return new TableSchema(fieldNames, fieldTypes); + } + + @Override + public String getComment() { + return null; + } + + @Override + public CatalogBaseTable copy() { + return this; Review comment: Maybe we throw an exception here because we are not able to provide a copy. Return just this might be dangerous because caller assumes a copy and make changes to the returned object. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r282967862 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.calcite.FlinkTypeFactory; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.schema.Table; + +import java.util.Map; +import java.util.Optional; + +/** + * Thin wrapper around Calcite specific {@link Table}, this is a temporary solution + * that allows to register those tables in the {@link CatalogManager}. + * TODO remove once we decouple TableEnvironment from Calcite. + */ +@Internal +public class CalciteCatalogTable implements CatalogBaseTable { + private final Table table; + private final FlinkTypeFactory typeFactory; + + public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) { + this.table = table; + this.typeFactory = typeFactory; + } + + public Table getTable() { + return table; + } + + @Override + public Map getProperties() { + throw new UnsupportedOperationException("Calcite table cannot be expressed as a map of properties."); Review comment: It might be better just to return an empty map instead of throwing an exception. By the definition, getProperties() return any additional properties a table might have. It doesn't mean the property form of the table. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r282961373 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ## @@ -280,6 +316,55 @@ */ void sqlUpdate(String stmt, QueryConfig config); + /** +* Gets the current default catalog name of the current session. +* +* @return the current default catalog that is used for path resolution +* @see TableEnvironment#setCurrentCatalog(String) +*/ + String getCurrentCatalogName(); + + /** +* Sets the current catalog to the given value. It also sets the default +* database to the catalog's default one. To assign both catalog and database explicitly +* see {@link TableEnvironment#setCurrentDatabase(String, String)}. +* +* This is used during resolution of object paths. The default path is constructed as +* {@code [current-catalog].[current.database]}. During the resolution, first we try to look for +* {@code [default-path].[object-path]} if no object is found we assume the object path is a fully +* qualified one and we look for {@code [object-path]}. +* +* @param name name of the catalog to set as current default catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +*/ + void setCurrentCatalog(String name) throws CatalogNotExistException; + + /** +* Gets the current default database name of the running session. +* +* @return the current database of the current catalog +* @see TableEnvironment#setCurrentDatabase(String, String) +*/ + String getCurrentDatabaseName(); + + /** +* Sets the current default catalog and database. That path will be used as the default one +* when looking for unqualified object names. +* +* This is used during resolution of object paths. The default path is constructed as +* {@code [current-catalog].[current.database]}. During the resolution, first we try to look for Review comment: should it be [current-database]? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database
xuefuz commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database URL: https://github.com/apache/flink/pull/8390#discussion_r282954828 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -65,9 +64,15 @@ private final Map> partitionColumnStats; public GenericInMemoryCatalog(String name) { + this(name, DEFAULT_DB); + } + + public GenericInMemoryCatalog(String name, String defaultDatabase) { Review comment: Cool. Got it. Will update. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions
StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions URL: https://github.com/apache/flink/pull/8290#issuecomment-491335910 @zhijiangW Thanks for the detailed review and good comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions
StephanEwen commented on issue #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions URL: https://github.com/apache/flink/pull/8290#issuecomment-491336129 @eaglewatcherwb This implementation makes no assumption when a partition will be released. That is the responsibility of the network environment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ambition119 commented on issue #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource
ambition119 commented on issue #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource URL: https://github.com/apache/flink/pull/8387#issuecomment-491337056 > Ad. 1 Once again we aim to separate connector and formats => JsonFile = Json + File > Ad. 2 It provides `FileSystem` descriptor not connector. See my comments about `CsvTableSource`. It's design is broken and we do not want to repeat this error > Ad. 3 Good to know, hive is not the source of truth though. Unfortunately we cannot accept general purpose code that works in just a single very narrow setup. @StephanEwen @dawidwys Thank you for your suggestions and responses. I am developing based on flink 1.6.x, and now I understand what you are talking about, so I will close this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ambition119 closed pull request #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource
ambition119 closed pull request #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource URL: https://github.com/apache/flink/pull/8387 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions
StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions URL: https://github.com/apache/flink/pull/8290#discussion_r282938868 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionMemory.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; + +import javax.annotation.Nullable; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Putting and getting of buffers to/from a region of memory. + * This class handles the headers, length encoding, memory slicing. + */ +final class BoundedBlockingSubpartitionMemory { + + // all fields and methods below here have package-private access to avoid bridge + // methods when accessing them from the nested classes + + static final int HEADER_LENGTH = 8; + + static final int HEADER_VALUE_IS_BUFFER = 0; + + static final int HEADER_VALUE_IS_EVENT = 1; + + static ByteBuffer checkAndConfigureByteBuffer(ByteBuffer buffer) { + checkArgument(buffer.position() == 0); + checkArgument(buffer.capacity() > 8); + checkArgument(buffer.limit() == buffer.capacity()); + + return buffer.order(ByteOrder.nativeOrder()); + } + + // + + static final class Writer { + + private final ByteBuffer memory; + + Writer(ByteBuffer memory) { + this.memory = checkAndConfigureByteBuffer(memory); + } + + public boolean writeBuffer(Buffer buffer) { + final ByteBuffer memory = this.memory; + final int bufferSize = buffer.getSize(); + + if (memory.remaining() < bufferSize + HEADER_LENGTH) { + return false; + } + + memory.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT); + memory.putInt(bufferSize); + memory.put(buffer.getNioBufferReadable()); + return true; + } + + public ByteBuffer complete() { + memory.flip(); + return memory; + } + + public int getNumBytes() { + return memory.position(); + } + } + + static final class Reader { + + private final ByteBuffer memory; + + Reader(ByteBuffer memory) { + this.memory = checkAndConfigureByteBuffer(memory); + } + + @Nullable + public Buffer sliceNextBuffer() { + final ByteBuffer memory = this.memory; Review comment: Localize the scope. Makes it clear to the reader that this is strictly local access, and often helps Java avoid some instructions (avoid indirection via implicit `this.`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions
StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions URL: https://github.com/apache/flink/pull/8290#discussion_r282938255 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent; +import org.apache.flink.shaded.netty4.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess; + +import sun.misc.Unsafe; + +import javax.annotation.Nullable; + +import java.security.AccessController; +import java.security.PrivilegedAction; + +/** + * Utility for accessing the system page size. + */ +final class PageSizeUtil { + + /** Value indicating an unknown page size. */ + public static final int PAGE_SIZE_UNKOWN = -1; + + /** The default page size on most system. */ + public static final int DEFAULT_PAGE_SIZE = 4 * 1024; + + /** A conservative fallback value (64 KiBytes) that should be a multiple of the page size even +* in some uncommon cases of servers installations with larger-than-usual page sizes. */ + public static final int CONSERVATIVE_PAGE_SIZE_MULTIPLE = 64 * 1024; + + /** +* Tries to get the system page size. If the page size cannot be determined, this +* returns -1. +* +* This internally relies on the presence of "unsafe" and the resolution via some +* Netty utilities. +*/ + public static int getSystemPageSize() { + try { + return PageSizeUtilInternal.getSystemPageSize(); + } + catch (Throwable t) { + ExceptionUtils.rethrowIfFatalError(t); + return PAGE_SIZE_UNKOWN; + } + } + + /** +* Tries to get the system page size. If the page size cannot be determined, this +* returns the {@link #DEFAULT_PAGE_SIZE}. +*/ + public static int getSystemPageSizeOrDefault() { + final int pageSize = getSystemPageSize(); + return pageSize == PAGE_SIZE_UNKOWN ? DEFAULT_PAGE_SIZE : pageSize; + } + + /** +* Tries to get the system page size. If the page size cannot be determined, this +* returns the {@link #CONSERVATIVE_PAGE_SIZE_MULTIPLE}. +*/ + public static int getSystemPageSizeOrConservativeMultiple() { Review comment: This util is meant as a more general util This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions
StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions URL: https://github.com/apache/flink/pull/8290#discussion_r282938159 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent; +import org.apache.flink.shaded.netty4.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess; + +import sun.misc.Unsafe; + +import javax.annotation.Nullable; + +import java.security.AccessController; +import java.security.PrivilegedAction; + +/** + * Utility for accessing the system page size. + */ +final class PageSizeUtil { + + /** Value indicating an unknown page size. */ + public static final int PAGE_SIZE_UNKOWN = -1; + + /** The default page size on most system. */ + public static final int DEFAULT_PAGE_SIZE = 4 * 1024; + + /** A conservative fallback value (64 KiBytes) that should be a multiple of the page size even +* in some uncommon cases of servers installations with larger-than-usual page sizes. */ + public static final int CONSERVATIVE_PAGE_SIZE_MULTIPLE = 64 * 1024; + + /** +* Tries to get the system page size. If the page size cannot be determined, this +* returns -1. +* +* This internally relies on the presence of "unsafe" and the resolution via some +* Netty utilities. +*/ + public static int getSystemPageSize() { + try { + return PageSizeUtilInternal.getSystemPageSize(); + } + catch (Throwable t) { + ExceptionUtils.rethrowIfFatalError(t); + return PAGE_SIZE_UNKOWN; + } + } + + /** +* Tries to get the system page size. If the page size cannot be determined, this +* returns the {@link #DEFAULT_PAGE_SIZE}. +*/ + public static int getSystemPageSizeOrDefault() { Review comment: This util is meant as a more general util This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions
StephanEwen commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions URL: https://github.com/apache/flink/pull/8290#discussion_r282938334 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent; +import org.apache.flink.shaded.netty4.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess; + +import sun.misc.Unsafe; + +import javax.annotation.Nullable; + +import java.security.AccessController; +import java.security.PrivilegedAction; + +/** + * Utility for accessing the system page size. + */ +final class PageSizeUtil { + + /** Value indicating an unknown page size. */ + public static final int PAGE_SIZE_UNKOWN = -1; Review comment: will fix that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist
sjwiesman commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist URL: https://github.com/apache/flink/pull/8330#discussion_r282937987 ## File path: docs/ops/production_ready.md ## @@ -22,79 +22,54 @@ specific language governing permissions and limitations under the License. --> +The production readiness checklist provides an overview of configuration options that should be carefully considered before bringing an Apache Flink job into production. +While the Flink community has attempted to provide sensible defaults for each configuration, it is important to review this list and ensure the options chosen are sufficient for your needs. + * ToC {:toc} -## Production Readiness Checklist - -Purpose of this production readiness checklist is to provide a condensed overview of configuration options that are -important and need **careful considerations** if you plan to bring your Flink job into **production**. For most of these options -Flink provides out-of-the-box defaults to make usage and adoption of Flink easier. For many users and scenarios, those -defaults are good starting points for development and completely sufficient for "one-shot" jobs. - -However, once you are planning to bring a Flink application to production the requirements typically increase. For example, -you want your job to be (re-)scalable and to have a good upgrade story for your job and new Flink versions. - -In the following, we present a collection of configuration options that you should check before your job goes into production. - -### Set maximum parallelism for operators explicitly - -Maximum parallelism is a configuration parameter that is newly introduced in Flink 1.2 and has important implications -for the (re-)scalability of your Flink job. This parameter, which can be set on a per-job and/or per-operator granularity, -determines the maximum parallelism to which you can scale operators. It is important to understand that (as of now) there -is **no way to change** this parameter after your job has been started, except for restarting your job completely -from scratch (i.e. with a new state, and not from a previous checkpoint/savepoint). Even if Flink would provide some way -to change maximum parallelism for existing savepoints in the future, you can already assume that for large states this is -likely a long running operation that you want to avoid. At this point, you might wonder why not just to use a very high -value as default for this parameter. The reason behind this is that high maximum parallelism can have some impact on your -application's performance and even state sizes, because Flink has to maintain certain metadata for its ability to rescale which -can increase with the maximum parallelism. In general, you should choose a max parallelism that is high enough to fit your -future needs in scalability, but keeping it as low as possible can give slightly better performance. In particular, -a maximum parallelism higher that 128 will typically result in slightly bigger state snapshots from the keyed backends. +### Set An Explicit Max Parallelism -Notice that maximum parallelism must fulfill the following conditions: +The max parallelism, set on a per-job and per-operator granularity, determines the maximum parallelism to which a stateful operator can scale. +There is currently **no way to change** the maximum parallelism of an operator after a job has started without discarding that operators state. +The reason maximum parallelism exists, versus allowing stateful operators to be infinitely scalable, is that it has some impact on your application's performance and state size. +Flink has to maintain specific metadata for its ability to rescale state which grows linearly with max parallelism. +In general, you should choose max parallelism that is high enough to fit your future needs in scalability, while keeping it low enough to maintain reasonable performance. -`0 < parallelism <= max parallelism <= 2^15` +{% panel **Note:** Maximum parallelism must fulfill the following conditions: `0 < parallelism <= max parallelism <= 2^15` %} -You can set the maximum parallelism by `setMaxParallelism(int maxparallelism)`. By default, Flink will choose the maximum -parallelism as a function of the parallelism when the job is first started: +You can explicitly set maximum parallelism by using `setMaxParallelism(int maxparallelism)`. +If no max parallelism is set Flink will decide using a function of the operators parallelism when the job is first started: - `128` : for all parallelism <= 128. - `MIN(nextPowerOfTwo(parallelism + (parallelism / 2)), 2^15)` : for all parallelism > 128. -### Set UUIDs for operators +### Set UUIDs For All Operators -As mentioned in the documentation for [savepoints]({{ site.baseurl }}/ops/state/savepoints.html), users should set uids for -operators. Those operator uids are
[GitHub] [flink] fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist
fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist URL: https://github.com/apache/flink/pull/8330#discussion_r282917921 ## File path: docs/ops/production_ready.md ## @@ -22,79 +22,54 @@ specific language governing permissions and limitations under the License. --> +The production readiness checklist provides an overview of configuration options that should be carefully considered before bringing an Apache Flink job into production. +While the Flink community has attempted to provide sensible defaults for each configuration, it is important to review this list and ensure the options chosen are sufficient for your needs. + * ToC {:toc} -## Production Readiness Checklist - -Purpose of this production readiness checklist is to provide a condensed overview of configuration options that are -important and need **careful considerations** if you plan to bring your Flink job into **production**. For most of these options -Flink provides out-of-the-box defaults to make usage and adoption of Flink easier. For many users and scenarios, those -defaults are good starting points for development and completely sufficient for "one-shot" jobs. - -However, once you are planning to bring a Flink application to production the requirements typically increase. For example, -you want your job to be (re-)scalable and to have a good upgrade story for your job and new Flink versions. - -In the following, we present a collection of configuration options that you should check before your job goes into production. - -### Set maximum parallelism for operators explicitly - -Maximum parallelism is a configuration parameter that is newly introduced in Flink 1.2 and has important implications -for the (re-)scalability of your Flink job. This parameter, which can be set on a per-job and/or per-operator granularity, -determines the maximum parallelism to which you can scale operators. It is important to understand that (as of now) there -is **no way to change** this parameter after your job has been started, except for restarting your job completely -from scratch (i.e. with a new state, and not from a previous checkpoint/savepoint). Even if Flink would provide some way -to change maximum parallelism for existing savepoints in the future, you can already assume that for large states this is -likely a long running operation that you want to avoid. At this point, you might wonder why not just to use a very high -value as default for this parameter. The reason behind this is that high maximum parallelism can have some impact on your -application's performance and even state sizes, because Flink has to maintain certain metadata for its ability to rescale which -can increase with the maximum parallelism. In general, you should choose a max parallelism that is high enough to fit your -future needs in scalability, but keeping it as low as possible can give slightly better performance. In particular, -a maximum parallelism higher that 128 will typically result in slightly bigger state snapshots from the keyed backends. +### Set An Explicit Max Parallelism -Notice that maximum parallelism must fulfill the following conditions: +The max parallelism, set on a per-job and per-operator granularity, determines the maximum parallelism to which a stateful operator can scale. +There is currently **no way to change** the maximum parallelism of an operator after a job has started without discarding that operators state. +The reason maximum parallelism exists, versus allowing stateful operators to be infinitely scalable, is that it has some impact on your application's performance and state size. +Flink has to maintain specific metadata for its ability to rescale state which grows linearly with max parallelism. Review comment: nit: AFAIK it does not grow linearly but logarithmic. (the key group id needs one more bit for each power-of-two step). Maybe just drop "linearly"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist
fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist URL: https://github.com/apache/flink/pull/8330#discussion_r282921904 ## File path: docs/ops/production_ready.md ## @@ -22,79 +22,54 @@ specific language governing permissions and limitations under the License. --> +The production readiness checklist provides an overview of configuration options that should be carefully considered before bringing an Apache Flink job into production. +While the Flink community has attempted to provide sensible defaults for each configuration, it is important to review this list and ensure the options chosen are sufficient for your needs. + * ToC {:toc} -## Production Readiness Checklist - -Purpose of this production readiness checklist is to provide a condensed overview of configuration options that are -important and need **careful considerations** if you plan to bring your Flink job into **production**. For most of these options -Flink provides out-of-the-box defaults to make usage and adoption of Flink easier. For many users and scenarios, those -defaults are good starting points for development and completely sufficient for "one-shot" jobs. - -However, once you are planning to bring a Flink application to production the requirements typically increase. For example, -you want your job to be (re-)scalable and to have a good upgrade story for your job and new Flink versions. - -In the following, we present a collection of configuration options that you should check before your job goes into production. - -### Set maximum parallelism for operators explicitly - -Maximum parallelism is a configuration parameter that is newly introduced in Flink 1.2 and has important implications -for the (re-)scalability of your Flink job. This parameter, which can be set on a per-job and/or per-operator granularity, -determines the maximum parallelism to which you can scale operators. It is important to understand that (as of now) there -is **no way to change** this parameter after your job has been started, except for restarting your job completely -from scratch (i.e. with a new state, and not from a previous checkpoint/savepoint). Even if Flink would provide some way -to change maximum parallelism for existing savepoints in the future, you can already assume that for large states this is -likely a long running operation that you want to avoid. At this point, you might wonder why not just to use a very high -value as default for this parameter. The reason behind this is that high maximum parallelism can have some impact on your -application's performance and even state sizes, because Flink has to maintain certain metadata for its ability to rescale which -can increase with the maximum parallelism. In general, you should choose a max parallelism that is high enough to fit your -future needs in scalability, but keeping it as low as possible can give slightly better performance. In particular, -a maximum parallelism higher that 128 will typically result in slightly bigger state snapshots from the keyed backends. +### Set An Explicit Max Parallelism -Notice that maximum parallelism must fulfill the following conditions: +The max parallelism, set on a per-job and per-operator granularity, determines the maximum parallelism to which a stateful operator can scale. +There is currently **no way to change** the maximum parallelism of an operator after a job has started without discarding that operators state. Review comment: un-highlight "no way to change"? The other aspects have similar issues and are not highlighted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist
fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist URL: https://github.com/apache/flink/pull/8330#discussion_r282919635 ## File path: docs/ops/production_ready.md ## @@ -22,79 +22,54 @@ specific language governing permissions and limitations under the License. --> +The production readiness checklist provides an overview of configuration options that should be carefully considered before bringing an Apache Flink job into production. +While the Flink community has attempted to provide sensible defaults for each configuration, it is important to review this list and ensure the options chosen are sufficient for your needs. + * ToC {:toc} -## Production Readiness Checklist - -Purpose of this production readiness checklist is to provide a condensed overview of configuration options that are -important and need **careful considerations** if you plan to bring your Flink job into **production**. For most of these options -Flink provides out-of-the-box defaults to make usage and adoption of Flink easier. For many users and scenarios, those -defaults are good starting points for development and completely sufficient for "one-shot" jobs. - -However, once you are planning to bring a Flink application to production the requirements typically increase. For example, -you want your job to be (re-)scalable and to have a good upgrade story for your job and new Flink versions. - -In the following, we present a collection of configuration options that you should check before your job goes into production. - -### Set maximum parallelism for operators explicitly - -Maximum parallelism is a configuration parameter that is newly introduced in Flink 1.2 and has important implications -for the (re-)scalability of your Flink job. This parameter, which can be set on a per-job and/or per-operator granularity, -determines the maximum parallelism to which you can scale operators. It is important to understand that (as of now) there -is **no way to change** this parameter after your job has been started, except for restarting your job completely -from scratch (i.e. with a new state, and not from a previous checkpoint/savepoint). Even if Flink would provide some way -to change maximum parallelism for existing savepoints in the future, you can already assume that for large states this is -likely a long running operation that you want to avoid. At this point, you might wonder why not just to use a very high -value as default for this parameter. The reason behind this is that high maximum parallelism can have some impact on your -application's performance and even state sizes, because Flink has to maintain certain metadata for its ability to rescale which -can increase with the maximum parallelism. In general, you should choose a max parallelism that is high enough to fit your -future needs in scalability, but keeping it as low as possible can give slightly better performance. In particular, -a maximum parallelism higher that 128 will typically result in slightly bigger state snapshots from the keyed backends. +### Set An Explicit Max Parallelism -Notice that maximum parallelism must fulfill the following conditions: +The max parallelism, set on a per-job and per-operator granularity, determines the maximum parallelism to which a stateful operator can scale. +There is currently **no way to change** the maximum parallelism of an operator after a job has started without discarding that operators state. +The reason maximum parallelism exists, versus allowing stateful operators to be infinitely scalable, is that it has some impact on your application's performance and state size. +Flink has to maintain specific metadata for its ability to rescale state which grows linearly with max parallelism. +In general, you should choose max parallelism that is high enough to fit your future needs in scalability, while keeping it low enough to maintain reasonable performance. -`0 < parallelism <= max parallelism <= 2^15` +{% panel **Note:** Maximum parallelism must fulfill the following conditions: `0 < parallelism <= max parallelism <= 2^15` %} -You can set the maximum parallelism by `setMaxParallelism(int maxparallelism)`. By default, Flink will choose the maximum -parallelism as a function of the parallelism when the job is first started: +You can explicitly set maximum parallelism by using `setMaxParallelism(int maxparallelism)`. +If no max parallelism is set Flink will decide using a function of the operators parallelism when the job is first started: - `128` : for all parallelism <= 128. - `MIN(nextPowerOfTwo(parallelism + (parallelism / 2)), 2^15)` : for all parallelism > 128. -### Set UUIDs for operators +### Set UUIDs For All Operators -As mentioned in the documentation for [savepoints]({{ site.baseurl }}/ops/state/savepoints.html), users should set uids for -operators. Those operator uids are
[GitHub] [flink] fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist
fhueske commented on a change in pull request #8330: [FLINK-12388][docs] Update the production readiness checklist URL: https://github.com/apache/flink/pull/8330#discussion_r282920531 ## File path: docs/ops/production_ready.md ## @@ -22,79 +22,54 @@ specific language governing permissions and limitations under the License. --> +The production readiness checklist provides an overview of configuration options that should be carefully considered before bringing an Apache Flink job into production. +While the Flink community has attempted to provide sensible defaults for each configuration, it is important to review this list and ensure the options chosen are sufficient for your needs. + * ToC {:toc} -## Production Readiness Checklist - -Purpose of this production readiness checklist is to provide a condensed overview of configuration options that are -important and need **careful considerations** if you plan to bring your Flink job into **production**. For most of these options -Flink provides out-of-the-box defaults to make usage and adoption of Flink easier. For many users and scenarios, those -defaults are good starting points for development and completely sufficient for "one-shot" jobs. - -However, once you are planning to bring a Flink application to production the requirements typically increase. For example, -you want your job to be (re-)scalable and to have a good upgrade story for your job and new Flink versions. - -In the following, we present a collection of configuration options that you should check before your job goes into production. - -### Set maximum parallelism for operators explicitly - -Maximum parallelism is a configuration parameter that is newly introduced in Flink 1.2 and has important implications -for the (re-)scalability of your Flink job. This parameter, which can be set on a per-job and/or per-operator granularity, -determines the maximum parallelism to which you can scale operators. It is important to understand that (as of now) there -is **no way to change** this parameter after your job has been started, except for restarting your job completely -from scratch (i.e. with a new state, and not from a previous checkpoint/savepoint). Even if Flink would provide some way -to change maximum parallelism for existing savepoints in the future, you can already assume that for large states this is -likely a long running operation that you want to avoid. At this point, you might wonder why not just to use a very high -value as default for this parameter. The reason behind this is that high maximum parallelism can have some impact on your -application's performance and even state sizes, because Flink has to maintain certain metadata for its ability to rescale which -can increase with the maximum parallelism. In general, you should choose a max parallelism that is high enough to fit your -future needs in scalability, but keeping it as low as possible can give slightly better performance. In particular, -a maximum parallelism higher that 128 will typically result in slightly bigger state snapshots from the keyed backends. +### Set An Explicit Max Parallelism -Notice that maximum parallelism must fulfill the following conditions: +The max parallelism, set on a per-job and per-operator granularity, determines the maximum parallelism to which a stateful operator can scale. +There is currently **no way to change** the maximum parallelism of an operator after a job has started without discarding that operators state. +The reason maximum parallelism exists, versus allowing stateful operators to be infinitely scalable, is that it has some impact on your application's performance and state size. +Flink has to maintain specific metadata for its ability to rescale state which grows linearly with max parallelism. +In general, you should choose max parallelism that is high enough to fit your future needs in scalability, while keeping it low enough to maintain reasonable performance. -`0 < parallelism <= max parallelism <= 2^15` +{% panel **Note:** Maximum parallelism must fulfill the following conditions: `0 < parallelism <= max parallelism <= 2^15` %} -You can set the maximum parallelism by `setMaxParallelism(int maxparallelism)`. By default, Flink will choose the maximum -parallelism as a function of the parallelism when the job is first started: +You can explicitly set maximum parallelism by using `setMaxParallelism(int maxparallelism)`. +If no max parallelism is set Flink will decide using a function of the operators parallelism when the job is first started: - `128` : for all parallelism <= 128. - `MIN(nextPowerOfTwo(parallelism + (parallelism / 2)), 2^15)` : for all parallelism > 128. -### Set UUIDs for operators +### Set UUIDs For All Operators -As mentioned in the documentation for [savepoints]({{ site.baseurl }}/ops/state/savepoints.html), users should set uids for -operators. Those operator uids are
[GitHub] [flink] yanghua commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore
yanghua commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore URL: https://github.com/apache/flink/pull/8410#discussion_r282921569 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -183,6 +184,8 @@ /** Registry that tracks state which is shared across (incremental) checkpoints. */ private SharedStateRegistry sharedStateRegistry; + private boolean isPerferCheckpointForRecovery; Review comment: Sorry for the typo, will fix it soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore
yanghua commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore URL: https://github.com/apache/flink/pull/8410#discussion_r282921234 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java ## @@ -72,6 +72,9 @@ /** Determines if a tasks are failed or not if there is an error in their checkpointing. Default: true */ private boolean failOnCheckpointingErrors = true; + /** Determines if a job will fallback to checkpoint when there is a savepoint later than checkpoint. **/ + private boolean perferCheckpointForRecovery = false; Review comment: I think this config option belongs to checkpoint field. So provide it in `CheckpointConfig` sounds more reasonable. What's more, it is a job level config option, right? However, `flink-conf.yaml` is a cluster-level configuration file(if users deploy Flink with standalone mode, it will affect all the jobs). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore
yanghua commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore URL: https://github.com/apache/flink/pull/8410#discussion_r282919368 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -1093,6 +1098,23 @@ public boolean restoreLatestCheckpointedState( } } + CompletedCheckpoint candidate = null; + if (isPerferCheckpointForRecovery && completedCheckpointStore.getAllCheckpoints().size() > 1) { Review comment: Agree. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gyfora commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore
gyfora commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore URL: https://github.com/apache/flink/pull/8410#discussion_r282908128 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -1093,6 +1098,23 @@ public boolean restoreLatestCheckpointedState( } } + CompletedCheckpoint candidate = null; + if (isPerferCheckpointForRecovery && completedCheckpointStore.getAllCheckpoints().size() > 1) { Review comment: It feels like this part basically reimplements the getLatestCheckpoint() logic of the completed checkpoint store. Maybe instead of having this hear the getLatestCheckpoint( ) could take the preferCheckpoints boolean parameter for better encapsulation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gyfora commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore
gyfora commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore URL: https://github.com/apache/flink/pull/8410#discussion_r282907507 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ## @@ -183,6 +184,8 @@ /** Registry that tracks state which is shared across (incremental) checkpoints. */ private SharedStateRegistry sharedStateRegistry; + private boolean isPerferCheckpointForRecovery; Review comment: There is a typo in basically every use of the word "preferCheckpointForRecovery" it should be prefer not "perfer". I think the easiest would be to search the changes and correct it in every place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gyfora commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore
gyfora commented on a change in pull request #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore URL: https://github.com/apache/flink/pull/8410#discussion_r282909021 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java ## @@ -72,6 +72,9 @@ /** Determines if a tasks are failed or not if there is an error in their checkpointing. Default: true */ private boolean failOnCheckpointingErrors = true; + /** Determines if a job will fallback to checkpoint when there is a savepoint later than checkpoint. **/ + private boolean perferCheckpointForRecovery = false; Review comment: I am personally not a big fan of configuring checkpoint parameters in the CheckpointConfig class, I actually wish that all these options would be exposed as general flink configuration parameters (settable in flink-conf.yaml). What do you think @StefanRRichter ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor URL: https://github.com/apache/flink/pull/7757#discussion_r282897244 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ## @@ -348,19 +352,26 @@ private void handleStartTaskExecutorServicesException(Exception e) throws Except resourceManagerHeartbeatManager.stop(); try { - stopTaskExecutorServices(); + CompletableFuture onStopFuture = stopTaskExecutorServices(failingTaskFutures); + log.info("Stopped TaskExecutor {}.", getAddress()); Review comment: this log actually happens after `taskManagerMetricGroup.close()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor URL: https://github.com/apache/flink/pull/7757#discussion_r282884262 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ## @@ -335,9 +336,12 @@ private void handleStartTaskExecutorServicesException(Exception e) throws Except resourceManagerConnection.close(); } + List> failingTaskFutures = new ArrayList<>(); for (JobManagerConnection jobManagerConnection : jobManagerConnections.values()) { try { - disassociateFromJobManager(jobManagerConnection, new FlinkException("The TaskExecutor is shutting down.")); + FlinkException cause = new FlinkException("The TaskExecutor is shutting down."); + failTasks(failingTaskFutures, jobManagerConnection, cause); Review comment: I would rather make `failTasks` return `List>` instead of passing it as an argument and then: `failingTaskFutures.addAll(failTasks(jobManagerConnection, cause));` this way it is more explicit what is happening. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor URL: https://github.com/apache/flink/pull/7757#discussion_r282882662 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -269,6 +272,9 @@ /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */ private long taskCancellationTimeout; + /** Future which is completed once the Task#run method is completed */ + private CompletableFuture taskRunFuture = new CompletableFuture<>(); Review comment: Maybe, `taskCompletionFuture` is a bit better name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor URL: https://github.com/apache/flink/pull/7757#discussion_r282888225 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ## @@ -348,19 +352,26 @@ private void handleStartTaskExecutorServicesException(Exception e) throws Except resourceManagerHeartbeatManager.stop(); try { - stopTaskExecutorServices(); + CompletableFuture onStopFuture = stopTaskExecutorServices(failingTaskFutures); + log.info("Stopped TaskExecutor {}.", getAddress()); + return onStopFuture; } catch (Exception e) { throwable = ExceptionUtils.firstOrSuppressed(e, throwable); - } - - if (throwable != null) { return FutureUtils.completedExceptionally(new FlinkException("Error while shutting the TaskExecutor down.", throwable)); - } else { - log.info("Stopped TaskExecutor {}.", getAddress()); - return CompletableFuture.completedFuture(null); } } + private void failTasks(List> failingTaskFutures, + JobManagerConnection jobManagerConnection, + Throwable cause) { + Iterator tasks = taskSlotTable.getTasks(jobManagerConnection.getJobID()); + while (tasks.hasNext()) { + try { + failingTaskFutures.add(tasks.next().failExternally(cause)); + } catch (IllegalStateException ignored) {} Review comment: Why is the exception ignored? Should we not rather add it? `failingTaskFutures.add(new CompletableFuture<>().completeExceptionally(ignored))` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor URL: https://github.com/apache/flink/pull/7757#discussion_r282882465 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1000,28 +1008,28 @@ public void cancelExecution() { * This method never blocks. */ @Override - public void failExternally(Throwable cause) { + public CompletableFuture failExternally(Throwable cause) { Review comment: If we add `public Task#getCompletionFuture()` method, we can avoid all changes in `failExternally/cancelOrFailAndCancelInvokable`. `failExternally/cancelOrFailAndCancelInvokable` can still be used to trigger the cancelation but we are interested in the task.run completion at the end which is a subsequent result of the cancelation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor URL: https://github.com/apache/flink/pull/7757#discussion_r282884777 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ## @@ -348,19 +352,26 @@ private void handleStartTaskExecutorServicesException(Exception e) throws Except resourceManagerHeartbeatManager.stop(); try { - stopTaskExecutorServices(); + CompletableFuture onStopFuture = stopTaskExecutorServices(failingTaskFutures); + log.info("Stopped TaskExecutor {}.", getAddress()); + return onStopFuture; Review comment: If `throwable != null` at this point of `return onStopFuture;`, we ignore `throwable != null` which was not the case previously. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor
azagrebin commented on a change in pull request #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor URL: https://github.com/apache/flink/pull/7757#discussion_r282896944 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ## @@ -396,6 +407,50 @@ private void stopTaskExecutorServices() throws Exception { ExceptionUtils.tryRethrowException(exception); } + private CompletableFuture stopTaskExecutorServices( + List> stoppingTaskFutures) throws Exception { + Exception exception = null; + + try { + jobLeaderService.stop(); Review comment: This looks like duplicated code with the previous method `stopTaskExecutorServices`. Can we split `stopTaskExecutorServices` into 2 methods and call them respectively in `stopTaskExecutorServices` and `onStop`? Then we do not need `stopTaskExecutorServices(stoppingTaskFutures)`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on issue #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource
dawidwys commented on issue #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource URL: https://github.com/apache/flink/pull/8387#issuecomment-491308505 Ad. 1 Once again we aim to separate connector and formats => JsonFile = Json + File Ad. 2 It provides `FileSystem` descriptor not connector. See my comments about `CsvTableSource`. It's design is broken and we do not want to repeat this error Ad. 3 Good to know, hive is not the source of truth though. Unfortunately we cannot accept general purpose code that works in just a single very narrow setup. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on issue #8359: [FLINK-11051][table] Add streaming window FlatAggregate to Table API
hequn8128 commented on issue #8359: [FLINK-11051][table] Add streaming window FlatAggregate to Table API URL: https://github.com/apache/flink/pull/8359#issuecomment-491306744 @sunjincheng121 Thank you very much for your suggestions and meticulous review. I have addressed all your comments and updated the PR. Would be great if you can take another look. Thank you. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database
dawidwys commented on a change in pull request #8390: [FLINK-12469][table] Clean up catalog API on default/current database URL: https://github.com/apache/flink/pull/8390#discussion_r282904007 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -65,9 +64,15 @@ private final Map> partitionColumnStats; public GenericInMemoryCatalog(String name) { + this(name, DEFAULT_DB); + } + + public GenericInMemoryCatalog(String name, String defaultDatabase) { Review comment: No, what I mean is to allow user to provide the defaultDB. In other words to have a ctor that does not create the defaultDb, but uses the one provided by the user. So the ctor would like: ``` public GenericInMemoryCatalog(String name, String defaultDbName, CatalogDatabase defaultDb) { checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty"); this.currentDatabase = defaultDbName; this.catalogName = name; this.databases = new LinkedHashMap<>(); this.databases.put(defaultDbName, defaultDb); this.tables = new LinkedHashMap<>(); this.functions = new LinkedHashMap<>(); this.partitions = new LinkedHashMap<>(); this.tableStats = new LinkedHashMap<>(); this.tableColumnStats = new LinkedHashMap<>(); this.partitionStats = new LinkedHashMap<>(); this.partitionColumnStats = new LinkedHashMap<>(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ambition119 edited a comment on issue #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource
ambition119 edited a comment on issue #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource URL: https://github.com/apache/flink/pull/8387#issuecomment-491298616 > Hi @ambition119 > I agree with @StephanEwen that this PR mixes the concepts of `connector`(file) and `format`(json). > > I don't necessarily understand the comment that `JsonRowFormatFactory` does not support File System Connector. First, there is no file system connector, and second of all this is a format factory thus it should have no notion of connector. > > I think you could rework your `JsonBatchTableSource` and `JsonRowInputFormat` to accept any `DeserializationSchema` similar to how `org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase` works. > > Another issue on a more conceptual layer is that the proposed `RowInputFormat` assumes that each line in a file is a separate record. I agree this is probably the most common case, but not the only one. > Files can be also written with a different layout (e.g. Parquet), thus we should differentiate that also on the connector level. > > I'm guessing you might have been inspired by `CsvTableSource`, but it was one of the first `TableSource`s that was written before we decided to split connectors and formats and its design is flawed. It also uses `RowCsvInputFormat` that as I said in the previous paragraph applies custom block(in this case line) splitting based on configurable delimiter. Thank you for your reply and discussion. First of all, the naming of this PR is a bit confusing. The code function mainly supports json files, such as local or hdfs, so I can rename it to **JsonFileXxx.** Second, flink does exist File System Connector, the official document is https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/connect.html#file-system-connector, provide [CsvBatchTableSourceFactory](https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory#L16) to support reading of csv files. ![image](https://user-images.githubusercontent.com/20353043/57529676-0fc6f180-7368-11e9-9828-bb1644013972.png) flink master show unit test info [FileSystem connector CSV Format](https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala#L59), and example: ```java public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); FileSystem connector = new FileSystem().path("/path/to/csv/in.csv"); OldCsv oldCsv = new OldCsv() .field("a", Types.STRING()) .field("b", Types.INT()) .field("c", Types.BOOLEAN()); Schema schema = new Schema() .field("a", Types.STRING()) .field("b", Types.INT()) .field("c", Types.BOOLEAN()); tEnv.connect(connector) .withFormat(oldCsv) .withSchema(schema) .registerTableSource("csvTable"); String fullSql = "select * FROM csvTable"; Table fullTable = tEnv.sqlQuery(fullSql); DataSet fullDS = tEnv.toDataSet(fullTable, Row.class); List result = fullDS.collect(); System.out.println(result); } ``` Third, for example, the hive support json file also is one line a separate record, otherwise it is not supported. eg: ```json { "fruit": [{ "weight": 8, "type": "apple" }, { "weight": 9, "type": "pear" }], "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "category": "fiction", "price": 22.99, "isbn": "0-395-19395-8", "email": "amy@only_for_json_udf_test.net", "owner": "amy", "zip_code": "94025", "fb_id": "1234" } ``` thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r282898836 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID executionVertexId :
[GitHub] [flink] yanghua commented on issue #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore
yanghua commented on issue #8410: [FLINK-11159] Allow configuration whether to fall back to savepoints for restore URL: https://github.com/apache/flink/pull/8410#issuecomment-491302415 @gyfora If you have time, can you review this PR for me? thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r282898836 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID executionVertexId :
[jira] [Assigned] (FLINK-12473) Add the interface of ML pipeline and ML lib
[ https://issues.apache.org/jira/browse/FLINK-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang reassigned FLINK-12473: - Assignee: Luo Gen > Add the interface of ML pipeline and ML lib > --- > > Key: FLINK-12473 > URL: https://issues.apache.org/jira/browse/FLINK-12473 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Shaoxuan Wang >Assignee: Luo Gen >Priority: Major > Labels: pull-request-available > Attachments: image-2019-05-10-12-50-15-869.png > > Original Estimate: 168h > Time Spent: 10m > Remaining Estimate: 167h 50m > > This Jira will introduce the major interfaces for ML pipeline and ML lib. > The major interfaces and their relationship diagram is shown as below. For > more details, please refer to [FLIP39 design > doc|[https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo]] > > !image-2019-05-10-12-50-15-869.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r282892019 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; + +import java.util.Collection; +import java.util.stream.Collectors; + +/** + * Strategy test utilities. + */ +public class StrategyTestUtil { + + static void initVerticesAndPartitions( + TestingSchedulingTopology schedulingTopology, + ResultPartitionType[] partitionTypes, + InputDependencyConstraint[] inputDependencyConstraints, + TestingSchedulingExecutionVertex[][] vertices, + TestingSchedulingResultPartition[][] partitions) { + + ResultPartitionID[][] resultPartitionIds = new ResultPartitionID[vertices.length - 1][vertices[0].length]; + initVerticesAndPartitions(schedulingTopology, partitionTypes, inputDependencyConstraints, + vertices, resultPartitionIds, partitions); + } + + static void initVerticesAndPartitions( + TestingSchedulingTopology schedulingTopology, + ResultPartitionType[] partitionTypes, + InputDependencyConstraint[] inputDependencyConstraints, + TestingSchedulingExecutionVertex[][] vertices, + ResultPartitionID[][] resultPartitionIds, + TestingSchedulingResultPartition[][] partitions) { + + final int jobVertexCnt = vertices.length; + final int taskCnt = vertices[0].length; + + JobVertexID[] jobVertexIds = new JobVertexID[jobVertexCnt]; + IntermediateDataSetID[] dataSetIds = new IntermediateDataSetID[jobVertexCnt]; + for (int i = 0; i < jobVertexCnt; i++) { + jobVertexIds[i] = new JobVertexID(); + dataSetIds[i] = new IntermediateDataSetID(); + schedulingTopology.addInputDependencyConstraint(jobVertexIds[i], inputDependencyConstraints[i]); + } + + for (int i = 0; i < jobVertexCnt; i++) { + for (int j = 0; j < taskCnt; j++) { + vertices[i][j] = new TestingSchedulingExecutionVertex(jobVertexIds[i], j); + schedulingTopology.addSchedulingExecutionVertex(vertices[i][j]); + + if (i != jobVertexCnt - 1) { + partitions[i][j] = new TestingSchedulingResultPartition(dataSetIds[i], new IntermediateResultPartitionID(), + partitionTypes[i], vertices[i][j]); + resultPartitionIds[i][j] = new ResultPartitionID(partitions[i][j].getId(), new ExecutionAttemptID()); + schedulingTopology.addResultPartition(partitions[i][j].getId(), partitions[i][j]); Review comment: The method `addResultPartition` should not exist. It should only be allowed to add new partitions to the topology via `SchedulingExecutionVertex`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[GitHub] [flink] ambition119 commented on issue #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource
ambition119 commented on issue #8387: [FLINK-11982] [table] File System Connector's support JSON Format and JSON file BatchTableSource URL: https://github.com/apache/flink/pull/8387#issuecomment-491298616 > Hi @ambition119 > I agree with @StephanEwen that this PR mixes the concepts of `connector`(file) and `format`(json). > > I don't necessarily understand the comment that `JsonRowFormatFactory` does not support File System Connector. First, there is no file system connector, and second of all this is a format factory thus it should have no notion of connector. > > I think you could rework your `JsonBatchTableSource` and `JsonRowInputFormat` to accept any `DeserializationSchema` similar to how `org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase` works. > > Another issue on a more conceptual layer is that the proposed `RowInputFormat` assumes that each line in a file is a separate record. I agree this is probably the most common case, but not the only one. > Files can be also written with a different layout (e.g. Parquet), thus we should differentiate that also on the connector level. > > I'm guessing you might have been inspired by `CsvTableSource`, but it was one of the first `TableSource`s that was written before we decided to split connectors and formats and its design is flawed. It also uses `RowCsvInputFormat` that as I said in the previous paragraph applies custom block(in this case line) splitting based on configurable delimiter. Thank you for your reply and discussion. First of all, the naming of this PR is a bit confusing. The code function mainly supports json files, such as local or hdfs, so you can rename it to JsonFileXxx. Second, flink does exist File System Connector, the official document is https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/connect.html#file-system-connector, provide [CsvBatchTableSourceFactory](https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory#L16) to support reading of csv files. ![image](https://user-images.githubusercontent.com/20353043/57529676-0fc6f180-7368-11e9-9828-bb1644013972.png) unit test [FileSystem connector CSV Format](https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala#L59) add example: ```java public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); FileSystem connector = new FileSystem().path("/path/to/csv/in.csv"); OldCsv oldCsv = new OldCsv() .field("a", Types.STRING()) .field("b", Types.INT()) .field("c", Types.BOOLEAN()); Schema schema = new Schema() .field("a", Types.STRING()) .field("b", Types.INT()) .field("c", Types.BOOLEAN()); tEnv.connect(connector) .withFormat(oldCsv) .withSchema(schema) .registerTableSource("csvTable"); String fullSql = "select * FROM csvTable"; Table fullTable = tEnv.sqlQuery(fullSql); DataSet fullDS = tEnv.toDataSet(fullTable, Row.class); List result = fullDS.collect(); System.out.println(result); } ``` Third, for example, the hive support json file also is one line a separate record, otherwise it is not supported. eg: ```json { "fruit": [{ "weight": 8, "type": "apple" }, { "weight": 9, "type": "pear" }], "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "category": "fiction", "price": 22.99, "isbn": "0-395-19395-8", "email": "amy@only_for_json_udf_test.net", "owner": "amy", "zip_code": "94025", "fb_id": "1234" } ``` thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r282893011 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ## @@ -280,6 +316,55 @@ */ void sqlUpdate(String stmt, QueryConfig config); + /** +* Gets the current default catalog name of the current session. +* +* @return the current default catalog that is used for path resolution +* @see TableEnvironment#setCurrentCatalog(String) +*/ + String getCurrentCatalogName(); Review comment: Nit: I wonder if we just name it getCurrentCatalog(), which returns the name of the current catalog. (Same for get current database name. I understand that the given name is more explicit, but getCurrentCatalog() is a mirror of the set method below and is more consistent with catalog APIs in which, every get (table, database) is named getXxx() instead of getXxxName(). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r282892019 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; + +import java.util.Collection; +import java.util.stream.Collectors; + +/** + * Strategy test utilities. + */ +public class StrategyTestUtil { + + static void initVerticesAndPartitions( + TestingSchedulingTopology schedulingTopology, + ResultPartitionType[] partitionTypes, + InputDependencyConstraint[] inputDependencyConstraints, + TestingSchedulingExecutionVertex[][] vertices, + TestingSchedulingResultPartition[][] partitions) { + + ResultPartitionID[][] resultPartitionIds = new ResultPartitionID[vertices.length - 1][vertices[0].length]; + initVerticesAndPartitions(schedulingTopology, partitionTypes, inputDependencyConstraints, + vertices, resultPartitionIds, partitions); + } + + static void initVerticesAndPartitions( + TestingSchedulingTopology schedulingTopology, + ResultPartitionType[] partitionTypes, + InputDependencyConstraint[] inputDependencyConstraints, + TestingSchedulingExecutionVertex[][] vertices, + ResultPartitionID[][] resultPartitionIds, + TestingSchedulingResultPartition[][] partitions) { + + final int jobVertexCnt = vertices.length; + final int taskCnt = vertices[0].length; + + JobVertexID[] jobVertexIds = new JobVertexID[jobVertexCnt]; + IntermediateDataSetID[] dataSetIds = new IntermediateDataSetID[jobVertexCnt]; + for (int i = 0; i < jobVertexCnt; i++) { + jobVertexIds[i] = new JobVertexID(); + dataSetIds[i] = new IntermediateDataSetID(); + schedulingTopology.addInputDependencyConstraint(jobVertexIds[i], inputDependencyConstraints[i]); + } + + for (int i = 0; i < jobVertexCnt; i++) { + for (int j = 0; j < taskCnt; j++) { + vertices[i][j] = new TestingSchedulingExecutionVertex(jobVertexIds[i], j); + schedulingTopology.addSchedulingExecutionVertex(vertices[i][j]); + + if (i != jobVertexCnt - 1) { + partitions[i][j] = new TestingSchedulingResultPartition(dataSetIds[i], new IntermediateResultPartitionID(), + partitionTypes[i], vertices[i][j]); + resultPartitionIds[i][j] = new ResultPartitionID(partitions[i][j].getId(), new ExecutionAttemptID()); + schedulingTopology.addResultPartition(partitions[i][j].getId(), partitions[i][j]); Review comment: The method `addResultPartition` should not exist. It should only be allowed to add new partitions to the topology via `SchedulingExecutionVertex`. Otherwise we risk being able to create _"impossible"_ topologies. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For
[jira] [Commented] (FLINK-12428) Translate the "Event Time" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837302#comment-16837302 ] jack commented on FLINK-12428: -- Have you finished the task? > Translate the "Event Time" page into Chinese > > > Key: FLINK-12428 > URL: https://issues.apache.org/jira/browse/FLINK-12428 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Affects Versions: 1.9.0 >Reporter: YangFei >Assignee: jack >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > file locate flink/docs/dev/event_time.zh.md > [https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r282887147 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.api.common.InputDependencyConstraint.ANY; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.connectVerticesToPartition; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.initVerticesAndPartitions; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + final int jobVertexCnt = 2; + final int taskCnt = 3; + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + TestingSchedulingExecutionVertex[][] vertices = new TestingSchedulingExecutionVertex[jobVertexCnt][taskCnt]; + TestingSchedulingResultPartition[][] partitions = new TestingSchedulingResultPartition[jobVertexCnt - 1][taskCnt]; + + ResultPartitionType[] partitionTypes = new ResultPartitionType[jobVertexCnt - 1]; + InputDependencyConstraint[] inputDependencyConstraints = new InputDependencyConstraint[jobVertexCnt]; + partitionTypes[0] = BLOCKING; + inputDependencyConstraints[0] = ANY; + inputDependencyConstraints[1] = ANY; + + initVerticesAndPartitions(schedulingTopology, partitionTypes, inputDependencyConstraints, vertices, partitions); + + for (int i = 0; i < taskCnt; i++) { + for (int j = 0; j < taskCnt; j++) { + connectVerticesToPartition(vertices[0][i], vertices[1][i], partitions[0][j]); + } + } + + TestingSchedulerOperations testingSchedulerOperation = new TestingSchedulerOperations(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology); + + schedulingStrategy.startScheduling(); + + Set toBeScheduledVertices = Arrays.stream(vertices[0]) + .map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet()); + Collection scheduledVertices = testingSchedulerOperation.getScheduledVertices().get(0); + + assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices), + containsInAnyOrder(toBeScheduledVertices.toArray())); + } + + /** +* Tests that when restart tasks
[GitHub] [flink] flinkbot commented on issue #8414: [FLINK-12477][Build] Enforce maven version 3.1.1
flinkbot commented on issue #8414: [FLINK-12477][Build] Enforce maven version 3.1.1 URL: https://github.com/apache/flink/pull/8414#issuecomment-491294860 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12477) Change threading-model in StreamTask to a mailbox-based approach
[ https://issues.apache.org/jira/browse/FLINK-12477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12477: --- Labels: pull-request-available (was: ) > Change threading-model in StreamTask to a mailbox-based approach > > > Key: FLINK-12477 > URL: https://issues.apache.org/jira/browse/FLINK-12477 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Reporter: Stefan Richter >Priority: Major > Labels: pull-request-available > > This is the umbrella issue to change the threading-model in StreamTask to a > mailbox-based approach. > You can find the corresponding design doc here: > https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g > Please comment and ask first before assigning sub-task. Thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] rmetzger opened a new pull request #8414: [FLINK-12477][Build] Enforce maven version 3.1.1
rmetzger opened a new pull request #8414: [FLINK-12477][Build] Enforce maven version 3.1.1 URL: https://github.com/apache/flink/pull/8414 ## Contribution Checklist ## What is the purpose of the change The `frontend-maven-plugin` introduced in https://github.com/apache/flink/pull/8016 requires at least Maven 3.1. With this change, we enforce the the latest stable release of Maven 3.1. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] fhueske commented on a change in pull request #8354: [FLINK-12421][docs-zh] Synchronize the latest documentation changes (commits to 7b46f604) into Chinese documents
fhueske commented on a change in pull request #8354: [FLINK-12421][docs-zh] Synchronize the latest documentation changes (commits to 7b46f604) into Chinese documents URL: https://github.com/apache/flink/pull/8354#discussion_r282885332 ## File path: docs/dev/connectors/kafka.zh.md ## @@ -88,7 +88,7 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is >= 1.0.0 This universal Kafka connector attempts to track the latest version of the Kafka client. -The version of the client it uses may change between Flink releases. +The version of the client it uses may change between Flink releases. As of this release, it uses the Kafka 2.2.0 client. Review comment: I think we should not reference `As of this release`. In 6 months nobody knows what this refers to. Could you replace that with the correct version (presumably 1.8) in both languages? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12428) Translate the "Event Time" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jack reassigned FLINK-12428: Assignee: jack (was: YangFei) > Translate the "Event Time" page into Chinese > > > Key: FLINK-12428 > URL: https://issues.apache.org/jira/browse/FLINK-12428 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Affects Versions: 1.9.0 >Reporter: YangFei >Assignee: jack >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > file locate flink/docs/dev/event_time.zh.md > [https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
GJL commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r282887147 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.api.common.InputDependencyConstraint.ANY; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.connectVerticesToPartition; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.initVerticesAndPartitions; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + final int jobVertexCnt = 2; + final int taskCnt = 3; + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + TestingSchedulingExecutionVertex[][] vertices = new TestingSchedulingExecutionVertex[jobVertexCnt][taskCnt]; + TestingSchedulingResultPartition[][] partitions = new TestingSchedulingResultPartition[jobVertexCnt - 1][taskCnt]; + + ResultPartitionType[] partitionTypes = new ResultPartitionType[jobVertexCnt - 1]; + InputDependencyConstraint[] inputDependencyConstraints = new InputDependencyConstraint[jobVertexCnt]; + partitionTypes[0] = BLOCKING; + inputDependencyConstraints[0] = ANY; + inputDependencyConstraints[1] = ANY; + + initVerticesAndPartitions(schedulingTopology, partitionTypes, inputDependencyConstraints, vertices, partitions); + + for (int i = 0; i < taskCnt; i++) { + for (int j = 0; j < taskCnt; j++) { + connectVerticesToPartition(vertices[0][i], vertices[1][i], partitions[0][j]); + } + } + + TestingSchedulerOperations testingSchedulerOperation = new TestingSchedulerOperations(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology); + + schedulingStrategy.startScheduling(); + + Set toBeScheduledVertices = Arrays.stream(vertices[0]) + .map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet()); + Collection scheduledVertices = testingSchedulerOperation.getScheduledVertices().get(0); + + assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices), + containsInAnyOrder(toBeScheduledVertices.toArray())); + } + + /** +* Tests that when restart tasks
[jira] [Created] (FLINK-12490) Introduce NetworkInput class
Piotr Nowojski created FLINK-12490: -- Summary: Introduce NetworkInput class Key: FLINK-12490 URL: https://issues.apache.org/jira/browse/FLINK-12490 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.8.0 Reporter: Piotr Nowojski Assignee: Piotr Nowojski NetworkInput will take care of records deserialisation and barrier alignment. It will be used by InputProcessors (One, TwoInput and TwoInputSelectable) to access buffers from InputGates. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] GJL closed pull request #8130: [FLINK-12053][runtime] Calculate job vertex parallelism based on metrics
GJL closed pull request #8130: [FLINK-12053][runtime] Calculate job vertex parallelism based on metrics URL: https://github.com/apache/flink/pull/8130 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services