[GitHub] [flink] flinkbot edited a comment on pull request #12507: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent
flinkbot edited a comment on pull request #12507: URL: https://github.com/apache/flink/pull/12507#issuecomment-639959577 ## CI report: * 90d421fc94e29c0af0fa563661b9dd8f2a388b1b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2855) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling
flinkbot edited a comment on pull request #12256: URL: https://github.com/apache/flink/pull/12256#issuecomment-631025695 ## CI report: * ca05fc241f6fd0550a39612b4ddef86b63a304a6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2625) * f181cf943442db6173eba4c817f40bbab49538da Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2857) * eba9c31264c2a0fe9f5bf955ba21de081bd1aa25 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12303: [FLINK-17625] [table] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction
flinkbot edited a comment on pull request #12303: URL: https://github.com/apache/flink/pull/12303#issuecomment-633242378 ## CI report: * 1c92b48de9d7b9068a646053e7dc9b4d076625ab Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2854) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…
flinkbot edited a comment on pull request #12508: URL: https://github.com/apache/flink/pull/12508#issuecomment-639985108 ## CI report: * 8953e51d845b3a61608d6e08f9caf462e43775bc Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2856) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling
flinkbot edited a comment on pull request #12256: URL: https://github.com/apache/flink/pull/12256#issuecomment-631025695 ## CI report: * ca05fc241f6fd0550a39612b4ddef86b63a304a6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2625) * f181cf943442db6173eba4c817f40bbab49538da UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling
zhuzhurk commented on a change in pull request #12256: URL: https://github.com/apache/flink/pull/12256#discussion_r436239625 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This slot allocator will request one physical slot for each single execution vertex. + * The slots will be requested in bulks so that the {@link SlotProvider} can check + * whether this bulk of slot requests can be fulfilled at the same time. + * It has several limitations: + * + * 1. Slot sharing will be ignored. + * + * 2. Co-location constraints are not allowed. + * + * 3. Intra-bulk input location preferences will be ignored. + */ +public class OneSlotPerExecutionSlotAllocator extends AbstractExecutionSlotAllocator { + + private static final Logger LOG = LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class); + + private final SlotProvider slotProvider; + + private final SlotOwner slotOwner; + + private final boolean slotWillBeOccupiedIndefinitely; + + private final Time allocationTimeout; + + public OneSlotPerExecutionSlotAllocator( + final SlotProvider slotProvider, + final InputsLocationsRetriever inputsLocationsRetriever, + final boolean slotWillBeOccupiedIndefinitely, + final Time allocationTimeout) { + + super(inputsLocationsRetriever); + + this.slotProvider = checkNotNull(slotProvider); + this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; + this.allocationTimeout = checkNotNull(allocationTimeout); + + this.slotOwner = new OneSlotPerExecutionSlotAllocatorSlotOwner(); + } + + @Override + public List allocateSlotsFor( + final List executionVertexSchedulingRequirements) { + + validateSchedulingRequirements(executionVertexSchedulingRequirements); + + validateNoCoLocationConstraint(executionVertexSchedulingRequirements); + + final Set allExecutionVertexIds = executionVertexSchedulingRequirements.stream() + .map(ExecutionVertexSchedulingRequirements::getExecutionVertexId) + .collect(Collectors.toSet()); + + final Map executionVertexSlotRequestIds = + generateExecutionVertexSlotRequestIds(allExecutionVertexIds); + + final List slotExecutionVertexAssignments = + createSlotExecutionVertexAssignments(executionVertexSchedulingReq
[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling
zhuzhurk commented on a change in pull request #12256: URL: https://github.com/apache/flink/pull/12256#discussion_r436239133 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java ## @@ -200,14 +203,16 @@ private SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId( executionVertexId))); } - private static class AllocationToggableSlotProvider implements SlotProvider { + static class AllocationToggableSlotProvider implements SlotProvider { private final List> slotAllocationRequests = new ArrayList<>(); private final List cancelledSlotRequestIds = new ArrayList<>(); private boolean slotAllocationDisabled; + private boolean forceFailingSlotAllocation = false; Review comment: `OneSlotPerExecutionSlotAllocatorTest` now has its own `TestingBulkSlotProvider` which does not have 3 modes. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Factory for {@link OneSlotPerExecutionSlotAllocator}. + */ +public class OneSlotPerExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorFactory { + + private final SlotProvider slotProvider; + + private final boolean slotWillBeOccupiedIndefinitely; + + private final Time allocationTimeout; + + public OneSlotPerExecutionSlotAllocatorFactory( Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling
zhuzhurk commented on a change in pull request #12256: URL: https://github.com/apache/flink/pull/12256#discussion_r436239010 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java ## @@ -189,7 +192,7 @@ private DefaultExecutionSlotAllocator createExecutionSlotAllocator(InputsLocatio return schedulingRequirements; } - private SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId( + static SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId( Review comment: Agreed. Added util classes in 216a75a34f2d090d726673dc272ea2801fa6eef8 and 277ee720dee5ab63dade29c7178d9e7af69a3f21 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling
zhuzhurk commented on a change in pull request #12256: URL: https://github.com/apache/flink/pull/12256#discussion_r436238944 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java ## @@ -216,13 +221,37 @@ private SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId( Time timeout) { slotAllocationRequests.add(Tuple3.of(slotRequestId, task, slotProfile)); - if (slotAllocationDisabled) { + if (forceFailingSlotAllocation) { + return FutureUtils.completedExceptionally(new Exception("Forced failure")); + } else if (slotAllocationDisabled) { return new CompletableFuture<>(); } else { return CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()); } } + public CompletableFuture> allocatePhysicalSlots( Review comment: Yes, `OneSlotPerExecutionSlotAllocatorTest` is reworked and does not use `AllocationToggableBulkSlotProvider` anymore. ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java ## @@ -216,13 +221,37 @@ private SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId( Time timeout) { slotAllocationRequests.add(Tuple3.of(slotRequestId, task, slotProfile)); - if (slotAllocationDisabled) { + if (forceFailingSlotAllocation) { + return FutureUtils.completedExceptionally(new Exception("Forced failure")); + } else if (slotAllocationDisabled) { return new CompletableFuture<>(); } else { return CompletableFuture.completedFuture(new TestingLogicalSlotBuilder().createTestingLogicalSlot()); } } + public CompletableFuture> allocatePhysicalSlots( Review comment: Yes, `OneSlotPerExecutionSlotAllocatorTest` is reworked now and does not use `AllocationToggableBulkSlotProvider` anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling
zhuzhurk commented on a change in pull request #12256: URL: https://github.com/apache/flink/pull/12256#discussion_r436238809 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This slot allocator will request one physical slot for each single execution vertex. + * The slots will be requested in bulks so that the {@link SlotProvider} can check + * whether this bulk of slot requests can be fulfilled at the same time. + * It has several limitations: + * + * 1. Slot sharing will be ignored. + * + * 2. Co-location constraints are not allowed. + * + * 3. Intra-bulk input location preferences will be ignored. + */ +public class OneSlotPerExecutionSlotAllocator extends AbstractExecutionSlotAllocator { + + private static final Logger LOG = LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class); + + private final SlotProvider slotProvider; + + private final SlotOwner slotOwner; + + private final boolean slotWillBeOccupiedIndefinitely; + + private final Time allocationTimeout; + + public OneSlotPerExecutionSlotAllocator( + final SlotProvider slotProvider, Review comment: It's done to make `OneSlotPerExecutionSlotAllocator` depend on `BulkSlotProvider`. I think `cancelSlotRequest` may be better because `SlotPoolImpl#releaseSlot()` also cancels pending physical slot request. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotPro
[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling
zhuzhurk commented on a change in pull request #12256: URL: https://github.com/apache/flink/pull/12256#discussion_r436238884 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This slot allocator will request one physical slot for each single execution vertex. + * The slots will be requested in bulks so that the {@link SlotProvider} can check + * whether this bulk of slot requests can be fulfilled at the same time. + * It has several limitations: + * + * 1. Slot sharing will be ignored. + * + * 2. Co-location constraints are not allowed. + * + * 3. Intra-bulk input location preferences will be ignored. + */ +public class OneSlotPerExecutionSlotAllocator extends AbstractExecutionSlotAllocator { + + private static final Logger LOG = LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class); + + private final SlotProvider slotProvider; + + private final SlotOwner slotOwner; + + private final boolean slotWillBeOccupiedIndefinitely; + + private final Time allocationTimeout; + + public OneSlotPerExecutionSlotAllocator( + final SlotProvider slotProvider, + final InputsLocationsRetriever inputsLocationsRetriever, + final boolean slotWillBeOccupiedIndefinitely, + final Time allocationTimeout) { + + super(inputsLocationsRetriever); + + this.slotProvider = checkNotNull(slotProvider); + this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; + this.allocationTimeout = checkNotNull(allocationTimeout); + + this.slotOwner = new OneSlotPerExecutionSlotAllocatorSlotOwner(); + } + + @Override + public List allocateSlotsFor( + final List executionVertexSchedulingRequirements) { + + validateSchedulingRequirements(executionVertexSchedulingRequirements); + + validateNoCoLocationConstraint(executionVertexSchedulingRequirements); + + final Set allExecutionVertexIds = executionVertexSchedulingRequirements.stream() + .map(ExecutionVertexSchedulingRequirements::getExecutionVertexId) + .collect(Collectors.toSet()); + + final Map executionVertexSlotRequestIds = + generateExecutionVertexSlotRequestIds(allExecutionVertexIds); + + final List slotExecutionVertexAssignments = + createSlotExecutionVertexAssignments(executionVertexSchedulingReq
[GitHub] [flink] flinkbot commented on pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…
flinkbot commented on pull request #12508: URL: https://github.com/apache/flink/pull/12508#issuecomment-639985108 ## CI report: * 8953e51d845b3a61608d6e08f9caf462e43775bc UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling
zhuzhurk commented on a change in pull request #12256: URL: https://github.com/apache/flink/pull/12256#discussion_r436238809 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This slot allocator will request one physical slot for each single execution vertex. + * The slots will be requested in bulks so that the {@link SlotProvider} can check + * whether this bulk of slot requests can be fulfilled at the same time. + * It has several limitations: + * + * 1. Slot sharing will be ignored. + * + * 2. Co-location constraints are not allowed. + * + * 3. Intra-bulk input location preferences will be ignored. + */ +public class OneSlotPerExecutionSlotAllocator extends AbstractExecutionSlotAllocator { + + private static final Logger LOG = LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class); + + private final SlotProvider slotProvider; + + private final SlotOwner slotOwner; + + private final boolean slotWillBeOccupiedIndefinitely; + + private final Time allocationTimeout; + + public OneSlotPerExecutionSlotAllocator( + final SlotProvider slotProvider, Review comment: It's done to make `OneSlotPerExecutionSlotAllocator` depend on `BulkSlotProvider`. I think `cancelSlotRequest` may be better because it also cancels pending physical slot request. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling
zhuzhurk commented on a change in pull request #12256: URL: https://github.com/apache/flink/pull/12256#discussion_r436237778 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This slot allocator will request one physical slot for each single execution vertex. + * The slots will be requested in bulks so that the {@link SlotProvider} can check + * whether this bulk of slot requests can be fulfilled at the same time. + * It has several limitations: + * + * 1. Slot sharing will be ignored. + * + * 2. Co-location constraints are not allowed. + * + * 3. Intra-bulk input location preferences will be ignored. + */ +public class OneSlotPerExecutionSlotAllocator extends AbstractExecutionSlotAllocator { + + private static final Logger LOG = LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class); + + private final SlotProvider slotProvider; + + private final SlotOwner slotOwner; + + private final boolean slotWillBeOccupiedIndefinitely; + + private final Time allocationTimeout; + + public OneSlotPerExecutionSlotAllocator( + final SlotProvider slotProvider, + final InputsLocationsRetriever inputsLocationsRetriever, + final boolean slotWillBeOccupiedIndefinitely, + final Time allocationTimeout) { + + super(inputsLocationsRetriever); + + this.slotProvider = checkNotNull(slotProvider); + this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; + this.allocationTimeout = checkNotNull(allocationTimeout); + + this.slotOwner = new OneSlotPerExecutionSlotAllocatorSlotOwner(); + } + + @Override + public List allocateSlotsFor( + final List executionVertexSchedulingRequirements) { + + validateSchedulingRequirements(executionVertexSchedulingRequirements); + + validateNoCoLocationConstraint(executionVertexSchedulingRequirements); + + final Set allExecutionVertexIds = executionVertexSchedulingRequirements.stream() + .map(ExecutionVertexSchedulingRequirements::getExecutionVertexId) + .collect(Collectors.toSet()); + + final Map executionVertexSlotRequestIds = + generateExecutionVertexSlotRequestIds(allExecutionVertexIds); + + final List slotExecutionVertexAssignments = + createSlotExecutionVertexAssignments(executionVertexSchedulingReq
[GitHub] [flink] Tartarus0zm commented on pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…
Tartarus0zm commented on pull request #12508: URL: https://github.com/apache/flink/pull/12508#issuecomment-639981987 @zentol can you review this for me if you have time, thanks ~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…
wuchong commented on pull request #12508: URL: https://github.com/apache/flink/pull/12508#issuecomment-639979473 cc @carp84 who is more familiar with state module. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…
flinkbot commented on pull request #12508: URL: https://github.com/apache/flink/pull/12508#issuecomment-639978998 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 8953e51d845b3a61608d6e08f9caf462e43775bc (Sat Jun 06 04:26:18 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Tartarus0zm commented on pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…
Tartarus0zm commented on pull request #12508: URL: https://github.com/apache/flink/pull/12508#issuecomment-639978848 @wuchong can you review this for me if you have time, thanks ~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Tartarus0zm opened a new pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…
Tartarus0zm opened a new pull request #12508: URL: https://github.com/apache/flink/pull/12508 ## What is the purpose of the change Add error message to precondition in KeyGroupPartitionedPriorityQueue. Avoid unclear exception information like ``` java.lang.ArrayIndexOutOfBoundsException: -49 at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:174) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:110) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:203) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:901) at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36) at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:920) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:402) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:745) ``` ## Brief change log no ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12507: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent
flinkbot edited a comment on pull request #12507: URL: https://github.com/apache/flink/pull/12507#issuecomment-639959577 ## CI report: * 90d421fc94e29c0af0fa563661b9dd8f2a388b1b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2855) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #12507: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent
flinkbot commented on pull request #12507: URL: https://github.com/apache/flink/pull/12507#issuecomment-639959577 ## CI report: * 90d421fc94e29c0af0fa563661b9dd8f2a388b1b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12303: [FLINK-17625] [table] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction
flinkbot edited a comment on pull request #12303: URL: https://github.com/apache/flink/pull/12303#issuecomment-633242378 ## CI report: * 3f6b52073802835bc193796a41d095c5670f5d59 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2081) * 1c92b48de9d7b9068a646053e7dc9b4d076625ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2854) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on pull request #12507: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent
becketqin commented on pull request #12507: URL: https://github.com/apache/flink/pull/12507#issuecomment-639958697 @StephanEwen Could you help take a look? Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #12507: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent
flinkbot commented on pull request #12507: URL: https://github.com/apache/flink/pull/12507#issuecomment-639957918 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 90d421fc94e29c0af0fa563661b9dd8f2a388b1b (Sat Jun 06 02:16:13 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin opened a new pull request #12507: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent
becketqin opened a new pull request #12507: URL: https://github.com/apache/flink/pull/12507 ## What is the purpose of the change The `AddSplitsEvent` contains a list of `SourceSplit` to be sent from the source coordinator to the source reader. Currently the `AddSplitsEvent` is a serializable but the `SourceSplit` is not serializable as we expect the users to provide a split serializer. In order to make sure the `AddSplitsEvent` is serializable, we need to use the user provided split serializer to save the serialized splits in the `AddSplitsEvent` instead of having the split objects. ## Brief change log 90d421fc94e29c0af0fa563661b9dd8f2a388b1b changes the `AddSplitsEvent` to save the serialized splits instead of the original instance. ## Verifying this change Existing unit tests covers the change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12303: [FLINK-17625] [table] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction
flinkbot edited a comment on pull request #12303: URL: https://github.com/apache/flink/pull/12303#issuecomment-633242378 ## CI report: * 3f6b52073802835bc193796a41d095c5670f5d59 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2081) * 1c92b48de9d7b9068a646053e7dc9b4d076625ab UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lsyldliu commented on pull request #12303: [FLINK-17625] [table] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction
lsyldliu commented on pull request #12303: URL: https://github.com/apache/flink/pull/12303#issuecomment-639951595 @wuchong Thanks for your review and I've addressed your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TengHu commented on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.
TengHu commented on pull request #12297: URL: https://github.com/apache/flink/pull/12297#issuecomment-639894888 @aljoscha Gentle ping on this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12506: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.
flinkbot edited a comment on pull request #12506: URL: https://github.com/apache/flink/pull/12506#issuecomment-639676996 ## CI report: * a6e3fc1886259cb1b7efede2596c9b581df06099 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2851) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12484: [FLINK-18127][tests] Streamline manual execution of Java E2E tests
flinkbot edited a comment on pull request #12484: URL: https://github.com/apache/flink/pull/12484#issuecomment-638836075 ## CI report: * 48ca964fb1f2350c6335ab4726553f38a4a4c1c0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2850) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever
flinkbot edited a comment on pull request #12423: URL: https://github.com/apache/flink/pull/12423#issuecomment-636712044 ## CI report: * 979c16df9319f87d4b96ff32d7e55823d8e41551 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2849) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12375: [FLINK-17017][runtime] Implements bulk allocation for physical slots
flinkbot edited a comment on pull request #12375: URL: https://github.com/apache/flink/pull/12375#issuecomment-635184560 ## CI report: * cd3fc98c034fdc61235d9109c05b4f55d7423746 UNKNOWN * a8902557952ea70746d27e88a392c74724784605 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2848) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12460: [FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner
flinkbot edited a comment on pull request #12460: URL: https://github.com/apache/flink/pull/12460#issuecomment-638102290 ## CI report: * 1d838c03d2b9f9744cae6fc03a919db72cc0efd9 UNKNOWN * 5729182fd15a074bebba7f878113781c2b119fd3 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2846) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12505: [FLINK-17765] Simplify Flink stack traces
flinkbot edited a comment on pull request #12505: URL: https://github.com/apache/flink/pull/12505#issuecomment-639597366 ## CI report: * 9e1468b9780de1ceecae32707a836f05561a14b1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2844) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever
flinkbot edited a comment on pull request #12423: URL: https://github.com/apache/flink/pull/12423#issuecomment-636712044 ## CI report: * 7a2dfb63f296d835193c2ded3ec2f1f868eaf92a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2845) * 979c16df9319f87d4b96ff32d7e55823d8e41551 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2849) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12504: [FLINK-16350] Support Zookeeper 3.5 in test_ha_per_job_cluster_datastream.sh
flinkbot edited a comment on pull request #12504: URL: https://github.com/apache/flink/pull/12504#issuecomment-639551716 ## CI report: * 9e7ffdf88b132d86eba7dd516c9b16d1b705758a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2842) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12503: [FLINK-18151][python] Resolve CWE22 problems in pyflink_gateway_server.py
flinkbot edited a comment on pull request #12503: URL: https://github.com/apache/flink/pull/12503#issuecomment-639444578 ## CI report: * ce4f8beceb4672b8daa260f1b484a5790b0ad719 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2839) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12446: [FLINK-16225] Implement user class loading exception handler
flinkbot edited a comment on pull request #12446: URL: https://github.com/apache/flink/pull/12446#issuecomment-637647665 ## CI report: * 426791910cdcdc50824687d842a182f4036dbfc1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2843) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #11586: [FLINK-5552][runtime] make JMXServer static per JVM
flinkbot edited a comment on pull request #11586: URL: https://github.com/apache/flink/pull/11586#issuecomment-606958289 ## CI report: * 0c0af972ac3f8477eba6c65cfe990a6af0105707 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2841) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12484: [FLINK-18127][tests] Streamline manual execution of Java E2E tests
flinkbot edited a comment on pull request #12484: URL: https://github.com/apache/flink/pull/12484#issuecomment-638836075 ## CI report: * 683c24369ba03611b096aad5c7e9d634c131549b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2760) * 48ca964fb1f2350c6335ab4726553f38a4a4c1c0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2850) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12506: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.
flinkbot edited a comment on pull request #12506: URL: https://github.com/apache/flink/pull/12506#issuecomment-639676996 ## CI report: * a6e3fc1886259cb1b7efede2596c9b581df06099 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2851) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (FLINK-17260) StreamingKafkaITCase failure on Azure
[ https://issues.apache.org/jira/browse/FLINK-17260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reopened FLINK-17260: failed again on master (including your commit): https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=2834&view=logs&s=ae4f8708-9994-57d3-c2d7-b892156e7812&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee > StreamingKafkaITCase failure on Azure > - > > Key: FLINK-17260 > URL: https://issues.apache.org/jira/browse/FLINK-17260 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Tests >Affects Versions: 1.11.0, 1.12.0 >Reporter: Roman Khachatryan >Assignee: Aljoscha Krettek >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.11.0 > > > [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/7544/logs/165] > > {code:java} > 2020-04-16T00:12:32.2848429Z [INFO] Running > org.apache.flink.tests.util.kafka.StreamingKafkaITCase > 2020-04-16T00:14:47.9100927Z [ERROR] Tests run: 3, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 135.621 s <<< FAILURE! - in > org.apache.flink.tests.util.k afka.StreamingKafkaITCase > 2020-04-16T00:14:47.9103036Z [ERROR] testKafka[0: > kafka-version:0.10.2.0](org.apache.flink.tests.util.kafka.StreamingKafkaITCase) > Time elapsed: 46.222 s <<< FAILURE! > 2020-04-16T00:14:47.9104033Z java.lang.AssertionError: > expected:<[elephant,27,64213]> but was:<[]> > 2020-04-16T00:14:47.9104638Zat org.junit.Assert.fail(Assert.java:88) > 2020-04-16T00:14:47.9105148Zat > org.junit.Assert.failNotEquals(Assert.java:834) > 2020-04-16T00:14:47.9105701Zat > org.junit.Assert.assertEquals(Assert.java:118) > 2020-04-16T00:14:47.9106239Zat > org.junit.Assert.assertEquals(Assert.java:144) > 2020-04-16T00:14:47.9107177Zat > org.apache.flink.tests.util.kafka.StreamingKafkaITCase.testKafka(StreamingKafkaITCase.java:162) > 2020-04-16T00:14:47.9107845Zat > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2020-04-16T00:14:47.9108434Zat > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2020-04-16T00:14:47.9109318Zat > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2020-04-16T00:14:47.9109914Zat > java.lang.reflect.Method.invoke(Method.java:498) > 2020-04-16T00:14:47.9110434Zat > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > 2020-04-16T00:14:47.9110985Zat > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2020-04-16T00:14:47.9111548Zat > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > 2020-04-16T00:14:47.9112083Zat > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2020-04-16T00:14:47.9112629Zat > org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48) > 2020-04-16T00:14:47.9113145Zat > org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48) > 2020-04-16T00:14:47.9113637Zat > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > 2020-04-16T00:14:47.9114072Zat > org.junit.rules.RunRules.evaluate(RunRules.java:20) > 2020-04-16T00:14:47.9114490Zat > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > 2020-04-16T00:14:47.9115256Zat > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > 2020-04-16T00:14:47.9115791Zat > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > 2020-04-16T00:14:47.9116292Zat > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2020-04-16T00:14:47.9116736Zat > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2020-04-16T00:14:47.9117779Zat > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2020-04-16T00:14:47.9118274Zat > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > 2020-04-16T00:14:47.9118766Zat > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > 2020-04-16T00:14:47.9119204Zat > org.junit.runners.ParentRunner.run(ParentRunner.java:363) > 2020-04-16T00:14:47.9119625Zat > org.junit.runners.Suite.runChild(Suite.java:128) > 2020-04-16T00:14:47.9120005Zat > org.junit.runners.Suite.runChild(Suite.java:27) > 2020-04-16T00:14:47.9120428Zat > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > 2020-04-16T00:14:47.9120876Zat > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > 2020-04-16T00:14:47.9121350Zat > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > 2020-04-16T00:14:47.9121805Zat > org.junit.runners.Parent
[GitHub] [flink] flinkbot edited a comment on pull request #12484: [FLINK-18127][tests] Streamline manual execution of Java E2E tests
flinkbot edited a comment on pull request #12484: URL: https://github.com/apache/flink/pull/12484#issuecomment-638836075 ## CI report: * 683c24369ba03611b096aad5c7e9d634c131549b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2760) * 48ca964fb1f2350c6335ab4726553f38a4a4c1c0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever
flinkbot edited a comment on pull request #12423: URL: https://github.com/apache/flink/pull/12423#issuecomment-636712044 ## CI report: * 362a6480b17c1133aae420e09518b08482edf7e7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2515) * 7a2dfb63f296d835193c2ded3ec2f1f868eaf92a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2845) * 979c16df9319f87d4b96ff32d7e55823d8e41551 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2849) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12473: [FLINK-18061] [table] TableResult#collect method should return a closeable iterator to avoid resource leak
flinkbot edited a comment on pull request #12473: URL: https://github.com/apache/flink/pull/12473#issuecomment-638574481 ## CI report: * 6ffb0ddcf8041c9e064e05fab3a9e1747c41820f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2836) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #12506: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.
flinkbot commented on pull request #12506: URL: https://github.com/apache/flink/pull/12506#issuecomment-639676996 ## CI report: * a6e3fc1886259cb1b7efede2596c9b581df06099 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12375: [FLINK-17017][runtime] Implements bulk allocation for physical slots
flinkbot edited a comment on pull request #12375: URL: https://github.com/apache/flink/pull/12375#issuecomment-635184560 ## CI report: * cd3fc98c034fdc61235d9109c05b4f55d7423746 UNKNOWN * ff97aeedb303fe540ebb3728ed280de414606978 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2818) * a8902557952ea70746d27e88a392c74724784605 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2848) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-16795) End to end tests timeout on Azure
[ https://issues.apache.org/jira/browse/FLINK-16795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126980#comment-17126980 ] Robert Metzger commented on FLINK-16795: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=2825&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5 > End to end tests timeout on Azure > - > > Key: FLINK-16795 > URL: https://issues.apache.org/jira/browse/FLINK-16795 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines, Tests >Affects Versions: 1.11.0 >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Example: > https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6650&view=logs&j=08866332-78f7-59e4-4f7e-49a56faa3179 > or > https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6637&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5 > {code}##[error]The job running on agent Azure Pipelines 6 ran longer than the > maximum time of 200 minutes. For more information, see > https://go.microsoft.com/fwlink/?linkid=2077134 > {code} > and {code}##[error]The operation was canceled.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #12453: [FLINK-18061] [table] TableResult#collect method should return a closeable iterator to avoid resource leak
flinkbot edited a comment on pull request #12453: URL: https://github.com/apache/flink/pull/12453#issuecomment-637941328 ## CI report: * ae99323647e5a2febfd0696738b53b540b72f3a2 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2835) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #12506: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.
flinkbot commented on pull request #12506: URL: https://github.com/apache/flink/pull/12506#issuecomment-639664025 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit a6e3fc1886259cb1b7efede2596c9b581df06099 (Fri Jun 05 17:48:33 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kl0u commented on a change in pull request #12501: [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map
kl0u commented on a change in pull request #12501: URL: https://github.com/apache/flink/pull/12501#discussion_r436070973 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java ## @@ -148,6 +149,7 @@ private Pod decoratePod(Pod pod) { // remove kubernetes.config.file propertiesMap.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key()); + propertiesMap.remove(DeploymentOptionsInternal.CONF_DIR.key()); Review comment: I think it would be safer to use `flinkConfig.removeConfig()` which also removes fallback keys. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] AHeise opened a new pull request #12506: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.
AHeise opened a new pull request #12506: URL: https://github.com/apache/flink/pull/12506 [unchanged 1.11 backport of https://github.com/apache/flink/pull/12493] ## What is the purpose of the change `CheckpointBarrierUnaligner#hasInflightData` was not called with input gate contextual information, such that only the same first few channels are checked during initial snapshotting of inflight data for multi-gate setups. ## Brief change log - Using `InputChannelInfo` in `hasInflightData` to get the flattened index ## Verifying this change - Added `StreamTaskNetworkInputTest#testSnapshotWithTwoInputGates`. - Various ITCases with sporadic data losses. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a change in pull request #12484: [FLINK-18127][tests] Streamline manual execution of Java E2E tests
zentol commented on a change in pull request #12484: URL: https://github.com/apache/flink/pull/12484#discussion_r436067756 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResourceFactory.java ## @@ -33,19 +37,83 @@ public final class LocalStandaloneFlinkResourceFactory implements FlinkResourceFactory { private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneFlinkResourceFactory.class); + private static final ParameterProperty PROJECT_ROOT_DIRECTORY = new ParameterProperty<>("rootDir", Paths::get); private static final ParameterProperty DISTRIBUTION_DIRECTORY = new ParameterProperty<>("distDir", Paths::get); private static final ParameterProperty DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", Paths::get); @Override public FlinkResource create(FlinkResourceSetup setup) { Optional distributionDirectory = DISTRIBUTION_DIRECTORY.get(); if (!distributionDirectory.isPresent()) { - throw new IllegalArgumentException("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + // distDir was not explicitly set, let's search for it + + Path projectRootPath; + Optional projectRoot = PROJECT_ROOT_DIRECTORY.get(); + if (projectRoot.isPresent()) { + // running with maven + projectRootPath = projectRoot.get(); + } else { + // running in the IDE; working directory is test module + Optional projectRootDirectory = findProjectRootDirectory(Paths.get("").toAbsolutePath()); + // this distinction is required in case this class is used outside of Flink + if (projectRootDirectory.isPresent()) { + projectRootPath = projectRootDirectory.get(); + } else { + throw new IllegalArgumentException( + "The 'distDir' property was not set and the flink-dist module could not be found automatically." + + " Please point the 'distDir' property to the directory containing distribution; you can set it when running maven via -DdistDir= ."); + } + } + Optional distribution = findDistribution(projectRootPath); + if (!distribution.isPresent()) { + throw new IllegalArgumentException( + "The 'distDir' property was not set and a distribution could not be found automatically." + + " Please point the 'distDir' property to the directory containing distribution; you can set it when running maven via -DdistDir= ."); + } else { + distributionDirectory = distribution; + } } Optional logBackupDirectory = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get(); if (!logBackupDirectory.isPresent()) { LOG.warn("Property {} not set, logs will not be backed up in case of test failures.", DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName()); } return new LocalStandaloneFlinkResource(distributionDirectory.get(), logBackupDirectory.orElse(null), setup); } + + public static void main(String[] args) { + System.out.println(findProjectRootDirectory(Paths.get("").toAbsolutePath())); + } Review comment: whoops, This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a change in pull request #12484: [FLINK-18127][tests] Streamline manual execution of Java E2E tests
zentol commented on a change in pull request #12484: URL: https://github.com/apache/flink/pull/12484#discussion_r436067636 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResourceFactory.java ## @@ -33,19 +37,83 @@ public final class LocalStandaloneFlinkResourceFactory implements FlinkResourceFactory { private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneFlinkResourceFactory.class); + private static final ParameterProperty PROJECT_ROOT_DIRECTORY = new ParameterProperty<>("rootDir", Paths::get); private static final ParameterProperty DISTRIBUTION_DIRECTORY = new ParameterProperty<>("distDir", Paths::get); private static final ParameterProperty DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", Paths::get); @Override public FlinkResource create(FlinkResourceSetup setup) { Optional distributionDirectory = DISTRIBUTION_DIRECTORY.get(); if (!distributionDirectory.isPresent()) { - throw new IllegalArgumentException("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + // distDir was not explicitly set, let's search for it + + Path projectRootPath; + Optional projectRoot = PROJECT_ROOT_DIRECTORY.get(); + if (projectRoot.isPresent()) { + // running with maven + projectRootPath = projectRoot.get(); + } else { + // running in the IDE; working directory is test module + Optional projectRootDirectory = findProjectRootDirectory(Paths.get("").toAbsolutePath()); + // this distinction is required in case this class is used outside of Flink + if (projectRootDirectory.isPresent()) { + projectRootPath = projectRootDirectory.get(); + } else { + throw new IllegalArgumentException( + "The 'distDir' property was not set and the flink-dist module could not be found automatically." + + " Please point the 'distDir' property to the directory containing distribution; you can set it when running maven via -DdistDir= ."); + } + } + Optional distribution = findDistribution(projectRootPath); + if (!distribution.isPresent()) { + throw new IllegalArgumentException( + "The 'distDir' property was not set and a distribution could not be found automatically." + + " Please point the 'distDir' property to the directory containing distribution; you can set it when running maven via -DdistDir= ."); + } else { + distributionDirectory = distribution; + } } Optional logBackupDirectory = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get(); if (!logBackupDirectory.isPresent()) { LOG.warn("Property {} not set, logs will not be backed up in case of test failures.", DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName()); } return new LocalStandaloneFlinkResource(distributionDirectory.get(), logBackupDirectory.orElse(null), setup); } + + public static void main(String[] args) { + System.out.println(findProjectRootDirectory(Paths.get("").toAbsolutePath())); + } + + private static Optional findProjectRootDirectory(Path currentDirectory) { + // move up the module structure until we find flink-dist; relies on all modules being prefixed with 'flink' + do { + if (Files.exists(currentDirectory.resolve("flink-dist"))) { + return Optional.of(currentDirectory); + } + currentDirectory = currentDirectory.getParent(); + } while (currentDirectory.getFileName().toString().startsWith("flink")); + return Optional.empty(); + } + + private static Optional findDistribution(Path projectRootDirectory) { + final Path distTargetDirectory = projectRootDirectory.resolve("flink-dist").resolve("target"); + try { + Collection paths = FileUtils.listFilesInDirectory(distTargetDirectory, p -> p.getFileName().toString().contains("flink-dist")); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling
zhuzhurk commented on a change in pull request #12256: URL: https://github.com/apache/flink/pull/12256#discussion_r436060993 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This slot allocator will request one physical slot for each single execution vertex. + * The slots will be requested in bulks so that the {@link SlotProvider} can check + * whether this bulk of slot requests can be fulfilled at the same time. + * It has several limitations: + * + * 1. Slot sharing will be ignored. + * + * 2. Co-location constraints are not allowed. + * + * 3. Intra-bulk input location preferences will be ignored. + */ +public class OneSlotPerExecutionSlotAllocator extends AbstractExecutionSlotAllocator { + + private static final Logger LOG = LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class); + + private final SlotProvider slotProvider; + + private final SlotOwner slotOwner; + + private final boolean slotWillBeOccupiedIndefinitely; + + private final Time allocationTimeout; + + public OneSlotPerExecutionSlotAllocator( + final SlotProvider slotProvider, + final InputsLocationsRetriever inputsLocationsRetriever, + final boolean slotWillBeOccupiedIndefinitely, + final Time allocationTimeout) { + + super(inputsLocationsRetriever); + + this.slotProvider = checkNotNull(slotProvider); + this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; + this.allocationTimeout = checkNotNull(allocationTimeout); + + this.slotOwner = new OneSlotPerExecutionSlotAllocatorSlotOwner(); + } + + @Override + public List allocateSlotsFor( + final List executionVertexSchedulingRequirements) { + + validateSchedulingRequirements(executionVertexSchedulingRequirements); + + validateNoCoLocationConstraint(executionVertexSchedulingRequirements); + + final Set allExecutionVertexIds = executionVertexSchedulingRequirements.stream() + .map(ExecutionVertexSchedulingRequirements::getExecutionVertexId) + .collect(Collectors.toSet()); + + final Map executionVertexSlotRequestIds = + generateExecutionVertexSlotRequestIds(allExecutionVertexIds); + + final List slotExecutionVertexAssignments = + createSlotExecutionVertexAssignments(executionVertexSchedulingReq
[GitHub] [flink] flinkbot edited a comment on pull request #12460: [FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner
flinkbot edited a comment on pull request #12460: URL: https://github.com/apache/flink/pull/12460#issuecomment-638102290 ## CI report: * 1d838c03d2b9f9744cae6fc03a919db72cc0efd9 UNKNOWN * 31da2083df6221fd36c6ed5674927eed1cc68088 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2828) * 5729182fd15a074bebba7f878113781c2b119fd3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2846) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #12460: [FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner
zhijiangW commented on a change in pull request #12460: URL: https://github.com/apache/flink/pull/12460#discussion_r436055041 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java ## @@ -512,6 +512,123 @@ public void testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except } } + /** +* Tests {@link CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)} +* abort the current pending checkpoint triggered by +* {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}. +*/ + @Test + public void testProcessCancellationBarrierWitchPendingCheckpoint() throws Exception { + final long checkpointId = 0L; + final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable(); + final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner( + new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable); + + ThreadSafeUnaligner unaligner = handler.getThreadSafeUnaligner(); + // should trigger respective checkpoint + unaligner.notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new InputChannelInfo(0, 0)); + + assertFalse(handler.isCheckpointPending()); + assertTrue(unaligner.isCheckpointPending()); + assertEquals(-1L, handler.getLatestCheckpointId()); + assertEquals(checkpointId, unaligner.getCurrentCheckpointId()); + + testProcessCancellationBarrier(handler, invokable, checkpointId); + } + + /** +* Tests {@link CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)} +* abort the current pending checkpoint triggered by +* {@link CheckpointBarrierUnaligner#processBarrier(CheckpointBarrier, int)}. +*/ + @Test + public void testProcessCancellationBarrierWitchPendingCheckpoint2() throws Exception { + final long checkpointId = 0L; + final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable(); + final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner( + new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable); + + // should trigger respective checkpoint + handler.processBarrier(buildCheckpointBarrier(checkpointId), 0); + + assertTrue(handler.isCheckpointPending()); + assertTrue(handler.getThreadSafeUnaligner().isCheckpointPending()); + assertEquals(checkpointId, handler.getLatestCheckpointId()); + assertEquals(checkpointId, handler.getThreadSafeUnaligner().getCurrentCheckpointId()); + + testProcessCancellationBarrier(handler, invokable, checkpointId); + } + + @Test + public void testProcessCancellationBarrierBeforeProcessAndReceiveBarrier() throws Exception { + final long checkpointId = 0L; + final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable(); + final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner( + new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable); + + handler.processCancellationBarrier(new CancelCheckpointMarker(checkpointId)); + + verifyTriggeredCheckpoint(handler, invokable, checkpointId); + + // it would not trigger checkpoint since the respective cancellation barrier already happened before + handler.processBarrier(buildCheckpointBarrier(checkpointId), 0); + handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new InputChannelInfo(0, 0)); + + verifyTriggeredCheckpoint(handler, invokable, checkpointId); + } + + private void testProcessCancellationBarrier( + CheckpointBarrierUnaligner handler, + ValidatingCheckpointInvokable invokable, + long currentCheckpointId) throws Exception { + + // should abort current checkpoint while processing CancelCheckpointMarker + handler.processCancellationBarrier(new CancelCheckpointMarker(currentCheckpointId)); + verifyTriggeredCheckpoint(handler, invokable, currentCheckpointId); + + final long canceledCheckpointId = 1L; + // should update current checkpoint id and abort notification while processing CancelCheckpointMarker + handler.processCancellationBarrier(new CancelCheckpointMarker(canceledCheckpointId)); + verifyTriggeredCheckpoint(handler, invokable, canceledCheckpointId); Review comment:
[GitHub] [flink] zhijiangW commented on a change in pull request #12460: [FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner
zhijiangW commented on a change in pull request #12460: URL: https://github.com/apache/flink/pull/12460#discussion_r436055041 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java ## @@ -512,6 +512,123 @@ public void testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except } } + /** +* Tests {@link CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)} +* abort the current pending checkpoint triggered by +* {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}. +*/ + @Test + public void testProcessCancellationBarrierWitchPendingCheckpoint() throws Exception { + final long checkpointId = 0L; + final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable(); + final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner( + new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable); + + ThreadSafeUnaligner unaligner = handler.getThreadSafeUnaligner(); + // should trigger respective checkpoint + unaligner.notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new InputChannelInfo(0, 0)); + + assertFalse(handler.isCheckpointPending()); + assertTrue(unaligner.isCheckpointPending()); + assertEquals(-1L, handler.getLatestCheckpointId()); + assertEquals(checkpointId, unaligner.getCurrentCheckpointId()); + + testProcessCancellationBarrier(handler, invokable, checkpointId); + } + + /** +* Tests {@link CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)} +* abort the current pending checkpoint triggered by +* {@link CheckpointBarrierUnaligner#processBarrier(CheckpointBarrier, int)}. +*/ + @Test + public void testProcessCancellationBarrierWitchPendingCheckpoint2() throws Exception { + final long checkpointId = 0L; + final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable(); + final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner( + new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable); + + // should trigger respective checkpoint + handler.processBarrier(buildCheckpointBarrier(checkpointId), 0); + + assertTrue(handler.isCheckpointPending()); + assertTrue(handler.getThreadSafeUnaligner().isCheckpointPending()); + assertEquals(checkpointId, handler.getLatestCheckpointId()); + assertEquals(checkpointId, handler.getThreadSafeUnaligner().getCurrentCheckpointId()); + + testProcessCancellationBarrier(handler, invokable, checkpointId); + } + + @Test + public void testProcessCancellationBarrierBeforeProcessAndReceiveBarrier() throws Exception { + final long checkpointId = 0L; + final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable(); + final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner( + new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable); + + handler.processCancellationBarrier(new CancelCheckpointMarker(checkpointId)); + + verifyTriggeredCheckpoint(handler, invokable, checkpointId); + + // it would not trigger checkpoint since the respective cancellation barrier already happened before + handler.processBarrier(buildCheckpointBarrier(checkpointId), 0); + handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new InputChannelInfo(0, 0)); + + verifyTriggeredCheckpoint(handler, invokable, checkpointId); + } + + private void testProcessCancellationBarrier( + CheckpointBarrierUnaligner handler, + ValidatingCheckpointInvokable invokable, + long currentCheckpointId) throws Exception { + + // should abort current checkpoint while processing CancelCheckpointMarker + handler.processCancellationBarrier(new CancelCheckpointMarker(currentCheckpointId)); + verifyTriggeredCheckpoint(handler, invokable, currentCheckpointId); + + final long canceledCheckpointId = 1L; + // should update current checkpoint id and abort notification while processing CancelCheckpointMarker + handler.processCancellationBarrier(new CancelCheckpointMarker(canceledCheckpointId)); + verifyTriggeredCheckpoint(handler, invokable, canceledCheckpointId); Review comment:
[GitHub] [flink] zhijiangW merged pull request #12493: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.
zhijiangW merged pull request #12493: URL: https://github.com/apache/flink/pull/12493 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12503: [FLINK-18151][python] Resolve CWE22 problems in pyflink_gateway_server.py
flinkbot edited a comment on pull request #12503: URL: https://github.com/apache/flink/pull/12503#issuecomment-639444578 ## CI report: * da2d996d1c4170fa70772715ca027cdc7d65407a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2830) * ce4f8beceb4672b8daa260f1b484a5790b0ad719 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2839) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever
flinkbot edited a comment on pull request #12423: URL: https://github.com/apache/flink/pull/12423#issuecomment-636712044 ## CI report: * 362a6480b17c1133aae420e09518b08482edf7e7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2515) * 7a2dfb63f296d835193c2ded3ec2f1f868eaf92a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2845) * 979c16df9319f87d4b96ff32d7e55823d8e41551 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12460: [FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner
flinkbot edited a comment on pull request #12460: URL: https://github.com/apache/flink/pull/12460#issuecomment-638102290 ## CI report: * 1d838c03d2b9f9744cae6fc03a919db72cc0efd9 UNKNOWN * b2c4d6aaf78612e7214cc49d22e2b90fe29b93b6 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2686) * 31da2083df6221fd36c6ed5674927eed1cc68088 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2828) * 5729182fd15a074bebba7f878113781c2b119fd3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2846) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12493: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.
flinkbot edited a comment on pull request #12493: URL: https://github.com/apache/flink/pull/12493#issuecomment-639156189 ## CI report: * 1973dc85708e04da3b66b511eb89c84478087dd5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2829) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12375: [FLINK-17017][runtime] Implements bulk allocation for physical slots
flinkbot edited a comment on pull request #12375: URL: https://github.com/apache/flink/pull/12375#issuecomment-635184560 ## CI report: * cd3fc98c034fdc61235d9109c05b4f55d7423746 UNKNOWN * ff97aeedb303fe540ebb3728ed280de414606978 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2818) * a8902557952ea70746d27e88a392c74724784605 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-18164) null <> 'str' should be true
Benchao Li created FLINK-18164: -- Summary: null <> 'str' should be true Key: FLINK-18164 URL: https://issues.apache.org/jira/browse/FLINK-18164 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Benchao Li Currently, if we compare null with other literals, the result will always be false. It's because the code gen always gives a default value (false) for the result. And I think it's a bug if `null <> 'str'` is false. It's reported from user-zh: http://apache-flink.147419.n8.nabble.com/flink-sql-null-false-td3640.html CC [~jark] [~ykt836] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17560) No Slots available exception in Apache Flink Job Manager while Scheduling
[ https://issues.apache.org/jira/browse/FLINK-17560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126933#comment-17126933 ] Chesnay Schepler commented on FLINK-17560: -- There's a ConcurrentModificationException in the TM logs when the slots are being offered. If this is a bug in the slot allocation protocol then the only option I see is to try a later Flink version. Are the running a customized Flink version? > No Slots available exception in Apache Flink Job Manager while Scheduling > - > > Key: FLINK-17560 > URL: https://issues.apache.org/jira/browse/FLINK-17560 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.3 > Environment: Flink verson 1.8.3 > Session cluster >Reporter: josson paul kalapparambath >Priority: Major > Attachments: jobmgr.log, threaddump-tm.txt, tm.log > > > Set up > -- > Flink verson 1.8.3 > Zookeeper HA cluster > 1 ResourceManager/Dispatcher (Same Node) > 1 TaskManager > 4 pipelines running with various parallelism's > Issue > -- > Occationally when the Job Manager gets restarted we noticed that all the > pipelines are not getting scheduled. The error that is reporeted by the Job > Manger is 'not enough slots are available'. This should not be the case > because task manager was deployed with sufficient slots for the number of > pipelines/parallelism we have. > We further noticed that the slot report sent by the taskmanger contains solts > filled with old CANCELLED job Ids. I am not sure why the task manager still > holds the details of the old jobs. Thread dump on the task manager confirms > that old pipelines are not running. > I am aware of https://issues.apache.org/jira/browse/FLINK-12865. But this is > not the issue happening in this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #12505: [FLINK-17765] Simplify Flink stack traces
flinkbot edited a comment on pull request #12505: URL: https://github.com/apache/flink/pull/12505#issuecomment-639597366 ## CI report: * 9e1468b9780de1ceecae32707a836f05561a14b1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2844) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12471: [FLINK-18073][FLINK-18029][avro] Fix AvroRowDataSerializationSchema is not serializable and add IT cases
flinkbot edited a comment on pull request #12471: URL: https://github.com/apache/flink/pull/12471#issuecomment-638362836 ## CI report: * 4bcc31b01d257710de3d5828167f97e0cd00ff03 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2826) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever
flinkbot edited a comment on pull request #12423: URL: https://github.com/apache/flink/pull/12423#issuecomment-636712044 ## CI report: * 362a6480b17c1133aae420e09518b08482edf7e7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2515) * 7a2dfb63f296d835193c2ded3ec2f1f868eaf92a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever
zhuzhurk commented on a change in pull request #12423: URL: https://github.com/apache/flink/pull/12423#discussion_r436028808 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Base class for all {@link ExecutionSlotAllocator}. It is responsible to allocate slots for tasks and + * keep the unfulfilled slot requests for further cancellation. + */ +public abstract class AbstractExecutionSlotAllocator implements ExecutionSlotAllocator { + + /** +* Store the uncompleted slot assignments. +*/ + protected final Map pendingSlotAssignments; + + protected final PreferredLocationsRetriever preferredLocationsRetriever; + + public AbstractExecutionSlotAllocator(final PreferredLocationsRetriever preferredLocationsRetriever) { + this.preferredLocationsRetriever = checkNotNull(preferredLocationsRetriever); + this.pendingSlotAssignments = new HashMap<>(); + } + + @Override + public void cancel(final ExecutionVertexID executionVertexId) { + final SlotExecutionVertexAssignment slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId); + if (slotExecutionVertexAssignment != null) { + slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false); + } + } + + @Override + public CompletableFuture stop() { + final List executionVertexIds = new ArrayList<>(pendingSlotAssignments.keySet()); + executionVertexIds.forEach(this::cancel); + Review comment: I will make the change "getNumberOfPendingSlotAssignments -> getPendingSlotAssignments" in #12256 when introducing `AbstractExecutionSlotAllocator`. It would be good for verifying the existence of a given assignment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a change in pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever
zhuzhurk commented on a change in pull request #12423: URL: https://github.com/apache/flink/pull/12423#discussion_r436024342 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Base class for all {@link ExecutionSlotAllocator}. It is responsible to allocate slots for tasks and + * keep the unfulfilled slot requests for further cancellation. + */ +public abstract class AbstractExecutionSlotAllocator implements ExecutionSlotAllocator { + + /** +* Store the uncompleted slot assignments. +*/ + protected final Map pendingSlotAssignments; + + protected final PreferredLocationsRetriever preferredLocationsRetriever; + + public AbstractExecutionSlotAllocator(final PreferredLocationsRetriever preferredLocationsRetriever) { + this.preferredLocationsRetriever = checkNotNull(preferredLocationsRetriever); + this.pendingSlotAssignments = new HashMap<>(); + } + + @Override + public void cancel(final ExecutionVertexID executionVertexId) { + final SlotExecutionVertexAssignment slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId); + if (slotExecutionVertexAssignment != null) { + slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false); + } + } + + @Override + public CompletableFuture stop() { + final List executionVertexIds = new ArrayList<>(pendingSlotAssignments.keySet()); + executionVertexIds.forEach(this::cancel); + Review comment: `ExecutionSlotAllocator#stop()` is removed via hotfix 7a2dfb63f296d835193c2ded3ec2f1f868eaf92a This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-17973) Test memory configuration of Flink cluster
[ https://issues.apache.org/jira/browse/FLINK-17973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-17973: Assignee: Chesnay Schepler > Test memory configuration of Flink cluster > -- > > Key: FLINK-17973 > URL: https://issues.apache.org/jira/browse/FLINK-17973 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Tests >Affects Versions: 1.11.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Critical > Labels: release-testing > Fix For: 1.11.0 > > > Make sure that Flink processes (in particular Master processes) fail with a > meaningful exception message if they exceed the configured memory budgets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18163) Should be volatile: network.api.writer.RecordWriter.flusherException
Roman Khachatryan created FLINK-18163: - Summary: Should be volatile: network.api.writer.RecordWriter.flusherException Key: FLINK-18163 URL: https://issues.apache.org/jira/browse/FLINK-18163 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.11.0 Reporter: Roman Khachatryan Assignee: Roman Khachatryan Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhuzhurk commented on a change in pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever
zhuzhurk commented on a change in pull request #12423: URL: https://github.com/apache/flink/pull/12423#discussion_r436018689 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.runtime.executiongraph.ExecutionVertex.MAX_DISTINCT_LOCATIONS_TO_CONSIDER; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Default implementation of {@link PreferredLocationsRetriever}. + * Locations based on state will be returned if exist. + * Otherwise locations based on inputs will be returned. + */ +public class DefaultPreferredLocationsRetriever implements PreferredLocationsRetriever { + + private final StateLocationRetriever stateLocationRetriever; + + private final InputsLocationsRetriever inputsLocationsRetriever; + + DefaultPreferredLocationsRetriever( + final StateLocationRetriever stateLocationRetriever, + final InputsLocationsRetriever inputsLocationsRetriever) { + + this.stateLocationRetriever = checkNotNull(stateLocationRetriever); + this.inputsLocationsRetriever = checkNotNull(inputsLocationsRetriever); + } + + @Override + public CompletableFuture> getPreferredLocations( + final ExecutionVertexID executionVertexId, + final Set producersToIgnore) { + + checkNotNull(executionVertexId); + checkNotNull(producersToIgnore); + + final Collection preferredLocationsBasedOnState = + getPreferredLocationsBasedOnState(executionVertexId, stateLocationRetriever); + if (!preferredLocationsBasedOnState.isEmpty()) { + return CompletableFuture.completedFuture(preferredLocationsBasedOnState); + } + + return getPreferredLocationsBasedOnInputs(executionVertexId, producersToIgnore, inputsLocationsRetriever); + } + + private static Collection getPreferredLocationsBasedOnState( + final ExecutionVertexID executionVertexId, + final StateLocationRetriever stateLocationRetriever) { + + return stateLocationRetriever.getStateLocation(executionVertexId) + .map(Collections::singleton) + .orElse(Collections.emptySet()); + } + + /** +* Gets the location preferences of the execution, as determined by the locations +* of the predecessors from which it receives input data. +* If there are more than {@link ExecutionVertex#MAX_DISTINCT_LOCATIONS_TO_CONSIDER} different locations of source data, +* or neither the sources have not been started nor will be started with the execution together, +* this method returns an empty collection to indicate no location preference. +* +* @return The preferred locations based in input streams, or an empty iterable, +* if there is no input-based preference. +*/ + static CompletableFuture> getPreferredLocationsBasedOnInputs( Review comment: done. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/PreferredLocationsRetriever.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF lice
[GitHub] [flink] zhuzhurk commented on a change in pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever
zhuzhurk commented on a change in pull request #12423: URL: https://github.com/apache/flink/pull/12423#discussion_r436018529 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.runtime.executiongraph.ExecutionVertex.MAX_DISTINCT_LOCATIONS_TO_CONSIDER; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Default implementation of {@link PreferredLocationsRetriever}. + * Locations based on state will be returned if exist. + * Otherwise locations based on inputs will be returned. + */ +public class DefaultPreferredLocationsRetriever implements PreferredLocationsRetriever { + + private final StateLocationRetriever stateLocationRetriever; + + private final InputsLocationsRetriever inputsLocationsRetriever; + + DefaultPreferredLocationsRetriever( + final StateLocationRetriever stateLocationRetriever, + final InputsLocationsRetriever inputsLocationsRetriever) { + + this.stateLocationRetriever = checkNotNull(stateLocationRetriever); + this.inputsLocationsRetriever = checkNotNull(inputsLocationsRetriever); + } + + @Override + public CompletableFuture> getPreferredLocations( + final ExecutionVertexID executionVertexId, + final Set producersToIgnore) { + + checkNotNull(executionVertexId); + checkNotNull(producersToIgnore); + + final Collection preferredLocationsBasedOnState = + getPreferredLocationsBasedOnState(executionVertexId, stateLocationRetriever); + if (!preferredLocationsBasedOnState.isEmpty()) { + return CompletableFuture.completedFuture(preferredLocationsBasedOnState); + } + + return getPreferredLocationsBasedOnInputs(executionVertexId, producersToIgnore, inputsLocationsRetriever); + } + + private static Collection getPreferredLocationsBasedOnState( + final ExecutionVertexID executionVertexId, + final StateLocationRetriever stateLocationRetriever) { + + return stateLocationRetriever.getStateLocation(executionVertexId) + .map(Collections::singleton) + .orElse(Collections.emptySet()); + } + + /** +* Gets the location preferences of the execution, as determined by the locations +* of the predecessors from which it receives input data. +* If there are more than {@link ExecutionVertex#MAX_DISTINCT_LOCATIONS_TO_CONSIDER} different locations of source data, +* or neither the sources have not been started nor will be started with the execution together, +* this method returns an empty collection to indicate no location preference. +* +* @return The preferred locations based in input streams, or an empty iterable, +* if there is no input-based preference. +*/ + static CompletableFuture> getPreferredLocationsBasedOnInputs( + final ExecutionVertexID executionVertexId, + final Set producersToIgnore, + final InputsLocationsRetriever inputsLocationsRetriever) { + + CompletableFuture> preferredLocations = + CompletableFuture.completedFuture(Collections.emptyList()); + + final Collection> locationsFutures = new ArrayLis
[GitHub] [flink] tillrohrmann commented on a change in pull request #12484: [FLINK-18127][tests] Streamline manual execution of Java E2E tests
tillrohrmann commented on a change in pull request #12484: URL: https://github.com/apache/flink/pull/12484#discussion_r436017785 ## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResourceFactory.java ## @@ -33,19 +37,83 @@ public final class LocalStandaloneFlinkResourceFactory implements FlinkResourceFactory { private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneFlinkResourceFactory.class); + private static final ParameterProperty PROJECT_ROOT_DIRECTORY = new ParameterProperty<>("rootDir", Paths::get); private static final ParameterProperty DISTRIBUTION_DIRECTORY = new ParameterProperty<>("distDir", Paths::get); private static final ParameterProperty DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", Paths::get); @Override public FlinkResource create(FlinkResourceSetup setup) { Optional distributionDirectory = DISTRIBUTION_DIRECTORY.get(); if (!distributionDirectory.isPresent()) { - throw new IllegalArgumentException("The distDir property was not set. You can set it when running maven via -DdistDir= ."); + // distDir was not explicitly set, let's search for it + + Path projectRootPath; + Optional projectRoot = PROJECT_ROOT_DIRECTORY.get(); + if (projectRoot.isPresent()) { + // running with maven + projectRootPath = projectRoot.get(); + } else { + // running in the IDE; working directory is test module + Optional projectRootDirectory = findProjectRootDirectory(Paths.get("").toAbsolutePath()); + // this distinction is required in case this class is used outside of Flink + if (projectRootDirectory.isPresent()) { + projectRootPath = projectRootDirectory.get(); + } else { + throw new IllegalArgumentException( + "The 'distDir' property was not set and the flink-dist module could not be found automatically." + + " Please point the 'distDir' property to the directory containing distribution; you can set it when running maven via -DdistDir= ."); + } + } + Optional distribution = findDistribution(projectRootPath); + if (!distribution.isPresent()) { + throw new IllegalArgumentException( + "The 'distDir' property was not set and a distribution could not be found automatically." + + " Please point the 'distDir' property to the directory containing distribution; you can set it when running maven via -DdistDir= ."); + } else { + distributionDirectory = distribution; + } } Optional logBackupDirectory = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get(); if (!logBackupDirectory.isPresent()) { LOG.warn("Property {} not set, logs will not be backed up in case of test failures.", DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName()); } return new LocalStandaloneFlinkResource(distributionDirectory.get(), logBackupDirectory.orElse(null), setup); } + + public static void main(String[] args) { + System.out.println(findProjectRootDirectory(Paths.get("").toAbsolutePath())); + } + + private static Optional findProjectRootDirectory(Path currentDirectory) { + // move up the module structure until we find flink-dist; relies on all modules being prefixed with 'flink' + do { + if (Files.exists(currentDirectory.resolve("flink-dist"))) { + return Optional.of(currentDirectory); + } + currentDirectory = currentDirectory.getParent(); + } while (currentDirectory.getFileName().toString().startsWith("flink")); + return Optional.empty(); + } + + private static Optional findDistribution(Path projectRootDirectory) { + final Path distTargetDirectory = projectRootDirectory.resolve("flink-dist").resolve("target"); + try { + Collection paths = FileUtils.listFilesInDirectory(distTargetDirectory, p -> p.getFileName().toString().contains("flink-dist")); +
[GitHub] [flink] zhuzhurk commented on a change in pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever
zhuzhurk commented on a change in pull request #12423: URL: https://github.com/apache/flink/pull/12423#discussion_r436018318 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirements.java ## @@ -169,8 +161,7 @@ public ExecutionVertexSchedulingRequirements build() { taskResourceProfile, physicalSlotResourceProfile, slotSharingGroupId, - coLocationConstraint, - preferredLocations); + coLocationConstraint); Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #12505: [FLINK-17765] Simplify Flink stack traces
flinkbot commented on pull request #12505: URL: https://github.com/apache/flink/pull/12505#issuecomment-639597366 ## CI report: * 9e1468b9780de1ceecae32707a836f05561a14b1 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12446: [FLINK-16225] Implement user class loading exception handler
flinkbot edited a comment on pull request #12446: URL: https://github.com/apache/flink/pull/12446#issuecomment-637647665 ## CI report: * 7af38d20bc7c7e484cd63b36ec2d603f14a99dfc Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2840) * 426791910cdcdc50824687d842a182f4036dbfc1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2843) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12460: [FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner
flinkbot edited a comment on pull request #12460: URL: https://github.com/apache/flink/pull/12460#issuecomment-638102290 ## CI report: * 1d838c03d2b9f9744cae6fc03a919db72cc0efd9 UNKNOWN * b2c4d6aaf78612e7214cc49d22e2b90fe29b93b6 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2686) * 31da2083df6221fd36c6ed5674927eed1cc68088 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2828) * 5729182fd15a074bebba7f878113781c2b119fd3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-17765) Verbose client error messages
[ https://issues.apache.org/jira/browse/FLINK-17765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-17765: - Assignee: Till Rohrmann (was: Chesnay Schepler) > Verbose client error messages > - > > Key: FLINK-17765 > URL: https://issues.apache.org/jira/browse/FLINK-17765 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission, Runtime / Coordination, Runtime > / REST >Affects Versions: 1.10.1, 1.11.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: pull-request-available > Fix For: 1.11.0 > > > Some client operations if they fail produce very verbose error messages which > are hard to decipher for the user. For example, if the job submission fails > because the savepoint path does not exist, then the user sees the following: > {code} > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > JobGraph. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:689) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:227) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:906) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:982) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:982) > Caused by: java.lang.RuntimeException: > java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > JobGraph. > at > org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:290) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1766) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:104) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:71) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1645) > at > org.apache.flink.streaming.examples.statemachine.StateMachineExample.main(StateMachineExample.java:142) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 8 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > JobGraph. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1761) > ... 17 more > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to > submit JobGraph. > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:366) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:290) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComple
[GitHub] [flink] flinkbot commented on pull request #12505: [FLINK-17765] Simplify Flink stack traces
flinkbot commented on pull request #12505: URL: https://github.com/apache/flink/pull/12505#issuecomment-639582390 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 9e1468b9780de1ceecae32707a836f05561a14b1 (Fri Jun 05 15:47:10 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api
wuchong commented on a change in pull request #12335: URL: https://github.com/apache/flink/pull/12335#discussion_r436004322 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table private Optional scanInternal(UnresolvedIdentifier identifier) { ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier); - return catalogManager.getTable(tableIdentifier) - .map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema())); + return catalogManager.getTable(tableIdentifier).map(t -> { + CatalogTableSchemaResolver resolver = new CatalogTableSchemaResolver(parser); Review comment: Putting it in `CatalogManager#getTable` sounds good to me. We thought to have a single util to resolve the schema. But I think you have a better idea to resolve the schema before CatalogTable is exposed out of CatalogManager, this can avoid forgetting to resolve the shema. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api
wuchong commented on a change in pull request #12335: URL: https://github.com/apache/flink/pull/12335#discussion_r436004322 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ## @@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table private Optional scanInternal(UnresolvedIdentifier identifier) { ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier); - return catalogManager.getTable(tableIdentifier) - .map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema())); + return catalogManager.getTable(tableIdentifier).map(t -> { + CatalogTableSchemaResolver resolver = new CatalogTableSchemaResolver(parser); Review comment: Putting it in `CatalogManager#getTable` sounds good to me. We thought to have a single util to resolve the schema. But I think you have a better idea to resolve the schema before CatalogTable is exposed out of CatalogManager, this can avoid to forget resove the shema. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-17380) Kafka09ITCase.runAllDeletesTest: "Memory records is not writable"
[ https://issues.apache.org/jira/browse/FLINK-17380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-17380: - Summary: Kafka09ITCase.runAllDeletesTest: "Memory records is not writable" (was: runAllDeletesTest: "Memory records is not writable") > Kafka09ITCase.runAllDeletesTest: "Memory records is not writable" > - > > Key: FLINK-17380 > URL: https://issues.apache.org/jira/browse/FLINK-17380 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Tests >Affects Versions: 1.10.0 >Reporter: Robert Metzger >Priority: Major > Labels: test-stability > > CI Run: https://travis-ci.org/github/apache/flink/jobs/678707334 > {code} > 23:27:15,112 [main] ERROR > org.apache.flink.streaming.connectors.kafka.Kafka09ITCase - > > Test > testAllDeletes(org.apache.flink.streaming.connectors.kafka.Kafka09ITCase) > failed with: > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:648) > at > org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) > at > org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runAllDeletesTest(KafkaConsumerTestBase.java:1475) > at > org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testAllDeletes(Kafka09ITCase.java:117) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseS
[GitHub] [flink] tillrohrmann commented on a change in pull request #12446: [FLINK-16225] Implement user class loading exception handler
tillrohrmann commented on a change in pull request #12446: URL: https://github.com/apache/flink/pull/12446#discussion_r435997739 ## File path: flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.function.Consumer; + +/** + * This class loader accepts a custom handler if an exception occurs in {@link #loadClass(String, boolean)}. + */ +public abstract class FlinkUserCodeClassLoader extends URLClassLoader { + public static final Consumer EMPTY_EXCEPTION_HANDLER = classLoadingException -> {}; Review comment: nit: ```suggestion public static final Consumer NOOP_EXCEPTION_HANDLER = classLoadingException -> {}; ``` ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java ## @@ -79,7 +79,9 @@ public void testRecoveryRegisterAndDownload() throws Exception { final BlobLibraryCacheManager.ClassLoaderFactory classLoaderFactory = BlobLibraryCacheManager.defaultClassLoaderFactory( FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, - new String[0]); + new String[0], + false, + exception -> {}); Review comment: Instead of passing a boolean and the fatal error handler to `defaultClassLoaderFactory()` one could also change it to pass in the `exception handler` instead. That way one would not pass in redundant information `false` and `exception -> {}` but it would suffice to simply pass `exception -> {}`. ## File path: flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ## @@ -110,6 +110,14 @@ " resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against" + " the fully qualified class name. These patterns are appended to \"" + ALWAYS_PARENT_FIRST_LOADER_PATTERNS.key() + "\"."); + @Documentation.Section(Documentation.Sections.EXPERT_CLASS_LOADING) + public static final ConfigOption FAIL_USER_CLASS_LOADING_METASPACE_OOM = ConfigOptions + .key("classloader.fail.on.metaspace.oom.error") + .booleanType() + .defaultValue(true) + .withDescription("Sets whether to fail and exit TaskManager JVM process if 'OutOfMemoryError: Metaspace' is " + Review comment: ```suggestion .withDescription("Fail Flink JVM processes if 'OutOfMemoryError: Metaspace' is " + ``` ## File path: flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ## @@ -110,6 +110,14 @@ " resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against" + " the fully qualified class name. These patterns are appended to \"" + ALWAYS_PARENT_FIRST_LOADER_PATTERNS.key() + "\"."); + @Documentation.Section(Documentation.Sections.EXPERT_CLASS_LOADING) + public static final ConfigOption FAIL_USER_CLASS_LOADING_METASPACE_OOM = ConfigOptions + .key("classloader.fail.on.metaspace.oom.error") Review comment: ```suggestion .key("classloader.fail-on-metaspace-oom-error") ``` ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java ## @@ -151,12 +163,31 @@ public URLClassLoader createClassLoader(URL[] libraryURLs) { classLoaderResolveOrder, libraryURLs, FlinkUserCodeClassLoaders.class.getClassLoader(), - alwaysParentFirstPatterns); + alwaysParentFirstPatterns, + classLoadingExceptionHandler); } } - public sta
[jira] [Updated] (FLINK-16768) HadoopS3RecoverableWriterITCase.testRecoverWithStateWithMultiPart hangs
[ https://issues.apache.org/jira/browse/FLINK-16768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-16768: - Issue Type: Bug (was: Task) > HadoopS3RecoverableWriterITCase.testRecoverWithStateWithMultiPart hangs > --- > > Key: FLINK-16768 > URL: https://issues.apache.org/jira/browse/FLINK-16768 > Project: Flink > Issue Type: Bug > Components: FileSystems, Tests >Affects Versions: 1.10.0 >Reporter: Zhijiang >Priority: Major > Labels: test-stability > Fix For: 1.11.0 > > > Logs: > [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6584&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=d26b3528-38b0-53d2-05f7-37557c2405e4] > {code:java} > 2020-03-24T15:52:18.9196862Z "main" #1 prio=5 os_prio=0 > tid=0x7fd36c00b800 nid=0xc21 runnable [0x7fd3743ce000] > 2020-03-24T15:52:18.9197235Zjava.lang.Thread.State: RUNNABLE > 2020-03-24T15:52:18.9197536Z at > java.net.SocketInputStream.socketRead0(Native Method) > 2020-03-24T15:52:18.9197931Z at > java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > 2020-03-24T15:52:18.9198340Z at > java.net.SocketInputStream.read(SocketInputStream.java:171) > 2020-03-24T15:52:18.9198749Z at > java.net.SocketInputStream.read(SocketInputStream.java:141) > 2020-03-24T15:52:18.9199171Z at > sun.security.ssl.InputRecord.readFully(InputRecord.java:465) > 2020-03-24T15:52:18.9199840Z at > sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) > 2020-03-24T15:52:18.9200265Z at > sun.security.ssl.InputRecord.read(InputRecord.java:532) > 2020-03-24T15:52:18.9200663Z at > sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975) > 2020-03-24T15:52:18.9201213Z - locked <0x927583d8> (a > java.lang.Object) > 2020-03-24T15:52:18.9201589Z at > sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933) > 2020-03-24T15:52:18.9202026Z at > sun.security.ssl.AppInputStream.read(AppInputStream.java:105) > 2020-03-24T15:52:18.9202583Z - locked <0x92758c00> (a > sun.security.ssl.AppInputStream) > 2020-03-24T15:52:18.9203029Z at > org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) > 2020-03-24T15:52:18.9203558Z at > org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:198) > 2020-03-24T15:52:18.9204121Z at > org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176) > 2020-03-24T15:52:18.9204626Z at > org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135) > 2020-03-24T15:52:18.9205121Z at > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) > 2020-03-24T15:52:18.9205679Z at > com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) > 2020-03-24T15:52:18.9206164Z at > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) > 2020-03-24T15:52:18.9206786Z at > com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125) > 2020-03-24T15:52:18.9207361Z at > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) > 2020-03-24T15:52:18.9207839Z at > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) > 2020-03-24T15:52:18.9208327Z at > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) > 2020-03-24T15:52:18.9208809Z at > com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) > 2020-03-24T15:52:18.9209273Z at > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) > 2020-03-24T15:52:18.9210003Z at > com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107) > 2020-03-24T15:52:18.9210658Z at > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) > 2020-03-24T15:52:18.9211154Z at > org.apache.hadoop.fs.s3a.S3AInputStream.lambda$read$3(S3AInputStream.java:445) > 2020-03-24T15:52:18.9211631Z at > org.apache.hadoop.fs.s3a.S3AInputStream$$Lambda$42/1936375962.execute(Unknown > Source) > 2020-03-24T15:52:18.9212044Z at > org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) > 2020-03-24T15:52:18.9212553Z at > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) > 2020-03-24T15:52:18.9212972Z at > org.apache.hadoop.fs.s3a.Invoker$$Lambda$23/1457226878.execute(Unknown Source) > 2020-03-24T15:52:18.9213408Z at > org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) > 2020-03-24T15:52:18.9213866Z at > org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) > 2020-03-24T15:52:18.9214273Z at > org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) > 2020-03-24T15:52:18.9214701Z at
[jira] [Commented] (FLINK-18161) Changing parallelism is not possible in sql-client.sh
[ https://issues.apache.org/jira/browse/FLINK-18161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126882#comment-17126882 ] Jark Wu commented on FLINK-18161: - Could we just allow SQL CLI to set Flink configurations? This will also make it possible to set checkpointing/watermark in SQL CLI. > Changing parallelism is not possible in sql-client.sh > - > > Key: FLINK-18161 > URL: https://issues.apache.org/jira/browse/FLINK-18161 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.11.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.11.0 > > > I tried using > {code} > SET execution.parallelism=12 > {code} > and changing the parallelism in the configuration file. > My SQL queries were always running with p=1 for all operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18096) Generated avro formats should support user specified name and namespace
[ https://issues.apache.org/jira/browse/FLINK-18096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126877#comment-17126877 ] Jark Wu commented on FLINK-18096: - This sounds like you will need an additional util to get avro Schema or schema string from TableSchema/Table. Like a reverse of FLINK-18158. > Generated avro formats should support user specified name and namespace > --- > > Key: FLINK-18096 > URL: https://issues.apache.org/jira/browse/FLINK-18096 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Seth Wiesman >Priority: Major > > When avro schema is auto derived it should still be possible to specify > namespace and name. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #12476: [FLINK-17902][python] Support the new interfaces about temporary functions in PyFlink
flinkbot edited a comment on pull request #12476: URL: https://github.com/apache/flink/pull/12476#issuecomment-638622433 ## CI report: * 049d7d45991db1164ff983ec3fd6c0b23ad6f07f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2817) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #12504: [FLINK-16350] Support Zookeeper 3.5 in test_ha_per_job_cluster_datastream.sh
flinkbot edited a comment on pull request #12504: URL: https://github.com/apache/flink/pull/12504#issuecomment-639551716 ## CI report: * 9e7ffdf88b132d86eba7dd516c9b16d1b705758a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2842) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-18161) Changing parallelism is not possible in sql-client.sh
[ https://issues.apache.org/jira/browse/FLINK-18161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126875#comment-17126875 ] Dawid Wysakowicz edited comment on FLINK-18161 at 6/5/20, 3:25 PM: --- The simplest fix would be to overwrite the {{parallelism.default}} with the value of {{execution.parallelism}} (which is only sql-client specific). was (Author: dawidwys): The simplest fix would be to set the value of {{execution.parallelism}} to the {{parallelism.default}}. > Changing parallelism is not possible in sql-client.sh > - > > Key: FLINK-18161 > URL: https://issues.apache.org/jira/browse/FLINK-18161 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.11.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.11.0 > > > I tried using > {code} > SET execution.parallelism=12 > {code} > and changing the parallelism in the configuration file. > My SQL queries were always running with p=1 for all operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18161) Changing parallelism is not possible in sql-client.sh
[ https://issues.apache.org/jira/browse/FLINK-18161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126875#comment-17126875 ] Dawid Wysakowicz commented on FLINK-18161: -- The simplest fix would be to set the value of {{execution.parallelism}} to the {{parallelism.default}}. > Changing parallelism is not possible in sql-client.sh > - > > Key: FLINK-18161 > URL: https://issues.apache.org/jira/browse/FLINK-18161 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.11.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.11.0 > > > I tried using > {code} > SET execution.parallelism=12 > {code} > and changing the parallelism in the configuration file. > My SQL queries were always running with p=1 for all operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18161) Changing parallelism is not possible in sql-client.sh
[ https://issues.apache.org/jira/browse/FLINK-18161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126873#comment-17126873 ] Dawid Wysakowicz commented on FLINK-18161: -- {{parallelism.default}} works from {{flink-conf.yaml}} There is no way to set it from the {{sql-client}} {{execution.parallelism}} works if we remove the {{parallelism.default}} from {{flink-conf.yaml}} > Changing parallelism is not possible in sql-client.sh > - > > Key: FLINK-18161 > URL: https://issues.apache.org/jira/browse/FLINK-18161 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.11.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.11.0 > > > I tried using > {code} > SET execution.parallelism=12 > {code} > and changing the parallelism in the configuration file. > My SQL queries were always running with p=1 for all operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #11586: [FLINK-5552][runtime] make JMXServer static per JVM
flinkbot edited a comment on pull request #11586: URL: https://github.com/apache/flink/pull/11586#issuecomment-606958289 ## CI report: * 613a5fd798911229926144246b94635cde46c6ed Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2309) * 0c0af972ac3f8477eba6c65cfe990a6af0105707 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2841) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18096) Generated avro formats should support user specified name and namespace
[ https://issues.apache.org/jira/browse/FLINK-18096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126874#comment-17126874 ] Jark Wu commented on FLINK-18096: - I see. Thanks for the explanation [~sjwiesman]. > Generated avro formats should support user specified name and namespace > --- > > Key: FLINK-18096 > URL: https://issues.apache.org/jira/browse/FLINK-18096 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Seth Wiesman >Priority: Major > > When avro schema is auto derived it should still be possible to specify > namespace and name. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-18161) Changing parallelism is not possible in sql-client.sh
[ https://issues.apache.org/jira/browse/FLINK-18161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-18161: Fix Version/s: 1.11.0 > Changing parallelism is not possible in sql-client.sh > - > > Key: FLINK-18161 > URL: https://issues.apache.org/jira/browse/FLINK-18161 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.11.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.11.0 > > > I tried using > {code} > SET execution.parallelism=12 > {code} > and changing the parallelism in the configuration file. > My SQL queries were always running with p=1 for all operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18161) Changing parallelism is not possible in sql-client.sh
[ https://issues.apache.org/jira/browse/FLINK-18161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126872#comment-17126872 ] Jark Wu commented on FLINK-18161: - Does {{parallelism.default}} works? > Changing parallelism is not possible in sql-client.sh > - > > Key: FLINK-18161 > URL: https://issues.apache.org/jira/browse/FLINK-18161 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.11.0 >Reporter: Robert Metzger >Priority: Critical > Fix For: 1.11.0 > > > I tried using > {code} > SET execution.parallelism=12 > {code} > and changing the parallelism in the configuration file. > My SQL queries were always running with p=1 for all operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-18096) Generated avro formats should support user specified name and namespace
[ https://issues.apache.org/jira/browse/FLINK-18096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126868#comment-17126868 ] Seth Wiesman edited comment on FLINK-18096 at 6/5/20, 3:19 PM: --- I certainly want to see registry support, and I believe Kurt has been discussing for 1.12 as well. This came up for me when I was working with a user earlier this week and we were using 1.11 to write avro data to Kafka. We then had a DataStream application that was consuming from that topic and manually created an avro schema based on our DDL. Orignially we wanted to use a specific record but were forced to use generic record based on the above mentioned Java namespace issue. was (Author: sjwiesman): I certainly want to see registry support in 1.12, and I believe Kurt has been discussing that as well. This came up for me when I was working with a user earlier this week and we were using 1.11 to write avro data to Kafka. We then had a DataStream application that was consuming from that topic and manually created an avro schema based on our DDL. Orignially we wanted to use a specific record but were forced to use generic record based on the above mentioned Java namespace issue. > Generated avro formats should support user specified name and namespace > --- > > Key: FLINK-18096 > URL: https://issues.apache.org/jira/browse/FLINK-18096 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Seth Wiesman >Priority: Major > > When avro schema is auto derived it should still be possible to specify > namespace and name. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18096) Generated avro formats should support user specified name and namespace
[ https://issues.apache.org/jira/browse/FLINK-18096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126868#comment-17126868 ] Seth Wiesman commented on FLINK-18096: -- I certainly want to see registry support in 1.12, and I believe Kurt has been discussing that as well. This came up for me when I was working with a user earlier this week and we were using 1.11 to write avro data to Kafka. We then had a DataStream application that was consuming from that topic and manually created an avro schema based on our DDL. Orignially we wanted to use a specific record but were forced to use generic record based on the above mentioned Java namespace issue. > Generated avro formats should support user specified name and namespace > --- > > Key: FLINK-18096 > URL: https://issues.apache.org/jira/browse/FLINK-18096 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Seth Wiesman >Priority: Major > > When avro schema is auto derived it should still be possible to specify > namespace and name. -- This message was sent by Atlassian Jira (v8.3.4#803005)