[GitHub] [flink] flinkbot edited a comment on pull request #12507: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12507:
URL: https://github.com/apache/flink/pull/12507#issuecomment-639959577


   
   ## CI report:
   
   * 90d421fc94e29c0af0fa563661b9dd8f2a388b1b Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2855)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12256:
URL: https://github.com/apache/flink/pull/12256#issuecomment-631025695


   
   ## CI report:
   
   * ca05fc241f6fd0550a39612b4ddef86b63a304a6 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2625)
 
   * f181cf943442db6173eba4c817f40bbab49538da Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2857)
 
   * eba9c31264c2a0fe9f5bf955ba21de081bd1aa25 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12303: [FLINK-17625] [table] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12303:
URL: https://github.com/apache/flink/pull/12303#issuecomment-633242378


   
   ## CI report:
   
   * 1c92b48de9d7b9068a646053e7dc9b4d076625ab Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2854)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12508:
URL: https://github.com/apache/flink/pull/12508#issuecomment-639985108


   
   ## CI report:
   
   * 8953e51d845b3a61608d6e08f9caf462e43775bc Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2856)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12256:
URL: https://github.com/apache/flink/pull/12256#issuecomment-631025695


   
   ## CI report:
   
   * ca05fc241f6fd0550a39612b4ddef86b63a304a6 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2625)
 
   * f181cf943442db6173eba4c817f40bbab49538da UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r436239625



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This slot allocator will request one physical slot for each single 
execution vertex.
+ * The slots will be requested in bulks so that the {@link SlotProvider} can 
check
+ * whether this bulk of slot requests can be fulfilled at the same time.
+ * It has several limitations:
+ *
+ * 1. Slot sharing will be ignored.
+ *
+ * 2. Co-location constraints are not allowed.
+ *
+ * 3. Intra-bulk input location preferences will be ignored.
+ */
+public class OneSlotPerExecutionSlotAllocator extends 
AbstractExecutionSlotAllocator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class);
+
+   private final SlotProvider slotProvider;
+
+   private final SlotOwner slotOwner;
+
+   private final boolean slotWillBeOccupiedIndefinitely;
+
+   private final Time allocationTimeout;
+
+   public OneSlotPerExecutionSlotAllocator(
+   final SlotProvider slotProvider,
+   final InputsLocationsRetriever inputsLocationsRetriever,
+   final boolean slotWillBeOccupiedIndefinitely,
+   final Time allocationTimeout) {
+
+   super(inputsLocationsRetriever);
+
+   this.slotProvider = checkNotNull(slotProvider);
+   this.slotWillBeOccupiedIndefinitely = 
slotWillBeOccupiedIndefinitely;
+   this.allocationTimeout = checkNotNull(allocationTimeout);
+
+   this.slotOwner = new 
OneSlotPerExecutionSlotAllocatorSlotOwner();
+   }
+
+   @Override
+   public List allocateSlotsFor(
+   final List 
executionVertexSchedulingRequirements) {
+
+   
validateSchedulingRequirements(executionVertexSchedulingRequirements);
+
+   
validateNoCoLocationConstraint(executionVertexSchedulingRequirements);
+
+   final Set allExecutionVertexIds = 
executionVertexSchedulingRequirements.stream()
+   
.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
+   .collect(Collectors.toSet());
+
+   final Map 
executionVertexSlotRequestIds =
+   
generateExecutionVertexSlotRequestIds(allExecutionVertexIds);
+
+   final List 
slotExecutionVertexAssignments =
+   
createSlotExecutionVertexAssignments(executionVertexSchedulingReq

[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r436239133



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
##
@@ -200,14 +203,16 @@ private SlotExecutionVertexAssignment 
findSlotAssignmentByExecutionVertexId(
executionVertexId)));
}
 
-   private static class AllocationToggableSlotProvider implements 
SlotProvider {
+   static class AllocationToggableSlotProvider implements SlotProvider {
 
private final List> slotAllocationRequests = new ArrayList<>();
 
private final List cancelledSlotRequestIds = new 
ArrayList<>();
 
private boolean slotAllocationDisabled;
 
+   private boolean forceFailingSlotAllocation = false;

Review comment:
   `OneSlotPerExecutionSlotAllocatorTest` now has its own 
`TestingBulkSlotProvider` which does not have 3 modes.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Factory for {@link OneSlotPerExecutionSlotAllocator}.
+ */
+public class OneSlotPerExecutionSlotAllocatorFactory implements 
ExecutionSlotAllocatorFactory {
+
+   private final SlotProvider slotProvider;
+
+   private final boolean slotWillBeOccupiedIndefinitely;
+
+   private final Time allocationTimeout;
+
+   public OneSlotPerExecutionSlotAllocatorFactory(

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r436239010



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
##
@@ -189,7 +192,7 @@ private DefaultExecutionSlotAllocator 
createExecutionSlotAllocator(InputsLocatio
return schedulingRequirements;
}
 
-   private SlotExecutionVertexAssignment 
findSlotAssignmentByExecutionVertexId(
+   static SlotExecutionVertexAssignment 
findSlotAssignmentByExecutionVertexId(

Review comment:
   Agreed. Added util classes in 216a75a34f2d090d726673dc272ea2801fa6eef8 
and 277ee720dee5ab63dade29c7178d9e7af69a3f21





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r436238944



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
##
@@ -216,13 +221,37 @@ private SlotExecutionVertexAssignment 
findSlotAssignmentByExecutionVertexId(
Time timeout) {
 
slotAllocationRequests.add(Tuple3.of(slotRequestId, 
task, slotProfile));
-   if (slotAllocationDisabled) {
+   if (forceFailingSlotAllocation) {
+   return FutureUtils.completedExceptionally(new 
Exception("Forced failure"));
+   } else if (slotAllocationDisabled) {
return new CompletableFuture<>();
} else {
return CompletableFuture.completedFuture(new 
TestingLogicalSlotBuilder().createTestingLogicalSlot());
}
}
 
+   public 
CompletableFuture> allocatePhysicalSlots(

Review comment:
   Yes, `OneSlotPerExecutionSlotAllocatorTest` is reworked and does not use 
`AllocationToggableBulkSlotProvider` anymore.

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
##
@@ -216,13 +221,37 @@ private SlotExecutionVertexAssignment 
findSlotAssignmentByExecutionVertexId(
Time timeout) {
 
slotAllocationRequests.add(Tuple3.of(slotRequestId, 
task, slotProfile));
-   if (slotAllocationDisabled) {
+   if (forceFailingSlotAllocation) {
+   return FutureUtils.completedExceptionally(new 
Exception("Forced failure"));
+   } else if (slotAllocationDisabled) {
return new CompletableFuture<>();
} else {
return CompletableFuture.completedFuture(new 
TestingLogicalSlotBuilder().createTestingLogicalSlot());
}
}
 
+   public 
CompletableFuture> allocatePhysicalSlots(

Review comment:
   Yes, `OneSlotPerExecutionSlotAllocatorTest` is reworked now and does not 
use `AllocationToggableBulkSlotProvider` anymore.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r436238809



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This slot allocator will request one physical slot for each single 
execution vertex.
+ * The slots will be requested in bulks so that the {@link SlotProvider} can 
check
+ * whether this bulk of slot requests can be fulfilled at the same time.
+ * It has several limitations:
+ *
+ * 1. Slot sharing will be ignored.
+ *
+ * 2. Co-location constraints are not allowed.
+ *
+ * 3. Intra-bulk input location preferences will be ignored.
+ */
+public class OneSlotPerExecutionSlotAllocator extends 
AbstractExecutionSlotAllocator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class);
+
+   private final SlotProvider slotProvider;
+
+   private final SlotOwner slotOwner;
+
+   private final boolean slotWillBeOccupiedIndefinitely;
+
+   private final Time allocationTimeout;
+
+   public OneSlotPerExecutionSlotAllocator(
+   final SlotProvider slotProvider,

Review comment:
   It's done to make `OneSlotPerExecutionSlotAllocator` depend on 
`BulkSlotProvider`.
   I think `cancelSlotRequest` may be better because 
`SlotPoolImpl#releaseSlot()` also cancels pending physical slot request.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotPro

[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r436238884



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This slot allocator will request one physical slot for each single 
execution vertex.
+ * The slots will be requested in bulks so that the {@link SlotProvider} can 
check
+ * whether this bulk of slot requests can be fulfilled at the same time.
+ * It has several limitations:
+ *
+ * 1. Slot sharing will be ignored.
+ *
+ * 2. Co-location constraints are not allowed.
+ *
+ * 3. Intra-bulk input location preferences will be ignored.
+ */
+public class OneSlotPerExecutionSlotAllocator extends 
AbstractExecutionSlotAllocator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class);
+
+   private final SlotProvider slotProvider;
+
+   private final SlotOwner slotOwner;
+
+   private final boolean slotWillBeOccupiedIndefinitely;
+
+   private final Time allocationTimeout;
+
+   public OneSlotPerExecutionSlotAllocator(
+   final SlotProvider slotProvider,
+   final InputsLocationsRetriever inputsLocationsRetriever,
+   final boolean slotWillBeOccupiedIndefinitely,
+   final Time allocationTimeout) {
+
+   super(inputsLocationsRetriever);
+
+   this.slotProvider = checkNotNull(slotProvider);
+   this.slotWillBeOccupiedIndefinitely = 
slotWillBeOccupiedIndefinitely;
+   this.allocationTimeout = checkNotNull(allocationTimeout);
+
+   this.slotOwner = new 
OneSlotPerExecutionSlotAllocatorSlotOwner();
+   }
+
+   @Override
+   public List allocateSlotsFor(
+   final List 
executionVertexSchedulingRequirements) {
+
+   
validateSchedulingRequirements(executionVertexSchedulingRequirements);
+
+   
validateNoCoLocationConstraint(executionVertexSchedulingRequirements);
+
+   final Set allExecutionVertexIds = 
executionVertexSchedulingRequirements.stream()
+   
.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
+   .collect(Collectors.toSet());
+
+   final Map 
executionVertexSlotRequestIds =
+   
generateExecutionVertexSlotRequestIds(allExecutionVertexIds);
+
+   final List 
slotExecutionVertexAssignments =
+   
createSlotExecutionVertexAssignments(executionVertexSchedulingReq

[GitHub] [flink] flinkbot commented on pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…

2020-06-05 Thread GitBox


flinkbot commented on pull request #12508:
URL: https://github.com/apache/flink/pull/12508#issuecomment-639985108


   
   ## CI report:
   
   * 8953e51d845b3a61608d6e08f9caf462e43775bc UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r436238809



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This slot allocator will request one physical slot for each single 
execution vertex.
+ * The slots will be requested in bulks so that the {@link SlotProvider} can 
check
+ * whether this bulk of slot requests can be fulfilled at the same time.
+ * It has several limitations:
+ *
+ * 1. Slot sharing will be ignored.
+ *
+ * 2. Co-location constraints are not allowed.
+ *
+ * 3. Intra-bulk input location preferences will be ignored.
+ */
+public class OneSlotPerExecutionSlotAllocator extends 
AbstractExecutionSlotAllocator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class);
+
+   private final SlotProvider slotProvider;
+
+   private final SlotOwner slotOwner;
+
+   private final boolean slotWillBeOccupiedIndefinitely;
+
+   private final Time allocationTimeout;
+
+   public OneSlotPerExecutionSlotAllocator(
+   final SlotProvider slotProvider,

Review comment:
   It's done to make `OneSlotPerExecutionSlotAllocator` depend on 
`BulkSlotProvider`.
   I think `cancelSlotRequest` may be better because it also cancels pending 
physical slot request.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r436237778



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This slot allocator will request one physical slot for each single 
execution vertex.
+ * The slots will be requested in bulks so that the {@link SlotProvider} can 
check
+ * whether this bulk of slot requests can be fulfilled at the same time.
+ * It has several limitations:
+ *
+ * 1. Slot sharing will be ignored.
+ *
+ * 2. Co-location constraints are not allowed.
+ *
+ * 3. Intra-bulk input location preferences will be ignored.
+ */
+public class OneSlotPerExecutionSlotAllocator extends 
AbstractExecutionSlotAllocator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class);
+
+   private final SlotProvider slotProvider;
+
+   private final SlotOwner slotOwner;
+
+   private final boolean slotWillBeOccupiedIndefinitely;
+
+   private final Time allocationTimeout;
+
+   public OneSlotPerExecutionSlotAllocator(
+   final SlotProvider slotProvider,
+   final InputsLocationsRetriever inputsLocationsRetriever,
+   final boolean slotWillBeOccupiedIndefinitely,
+   final Time allocationTimeout) {
+
+   super(inputsLocationsRetriever);
+
+   this.slotProvider = checkNotNull(slotProvider);
+   this.slotWillBeOccupiedIndefinitely = 
slotWillBeOccupiedIndefinitely;
+   this.allocationTimeout = checkNotNull(allocationTimeout);
+
+   this.slotOwner = new 
OneSlotPerExecutionSlotAllocatorSlotOwner();
+   }
+
+   @Override
+   public List allocateSlotsFor(
+   final List 
executionVertexSchedulingRequirements) {
+
+   
validateSchedulingRequirements(executionVertexSchedulingRequirements);
+
+   
validateNoCoLocationConstraint(executionVertexSchedulingRequirements);
+
+   final Set allExecutionVertexIds = 
executionVertexSchedulingRequirements.stream()
+   
.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
+   .collect(Collectors.toSet());
+
+   final Map 
executionVertexSlotRequestIds =
+   
generateExecutionVertexSlotRequestIds(allExecutionVertexIds);
+
+   final List 
slotExecutionVertexAssignments =
+   
createSlotExecutionVertexAssignments(executionVertexSchedulingReq

[GitHub] [flink] Tartarus0zm commented on pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…

2020-06-05 Thread GitBox


Tartarus0zm commented on pull request #12508:
URL: https://github.com/apache/flink/pull/12508#issuecomment-639981987


   @zentol can you review this for me if you have time, thanks ~



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…

2020-06-05 Thread GitBox


wuchong commented on pull request #12508:
URL: https://github.com/apache/flink/pull/12508#issuecomment-639979473


   cc @carp84 who is more familiar with state module. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…

2020-06-05 Thread GitBox


flinkbot commented on pull request #12508:
URL: https://github.com/apache/flink/pull/12508#issuecomment-639978998


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 8953e51d845b3a61608d6e08f9caf462e43775bc (Sat Jun 06 
04:26:18 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Tartarus0zm commented on pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…

2020-06-05 Thread GitBox


Tartarus0zm commented on pull request #12508:
URL: https://github.com/apache/flink/pull/12508#issuecomment-639978848


   @wuchong can you review this for me if you have time, thanks ~



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Tartarus0zm opened a new pull request #12508: [hotfix][state] Add error message to precondition in KeyGroupPartitio…

2020-06-05 Thread GitBox


Tartarus0zm opened a new pull request #12508:
URL: https://github.com/apache/flink/pull/12508


   ## What is the purpose of the change
   
   Add error message to precondition in KeyGroupPartitionedPriorityQueue.
   Avoid unclear exception information like
   ```
   java.lang.ArrayIndexOutOfBoundsException: -49
   at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:174)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:110)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:203)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:901)
at 
org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
at 
org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:920)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:402)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:745)
   ```
   
   ## Brief change log
   
   no
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12507: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12507:
URL: https://github.com/apache/flink/pull/12507#issuecomment-639959577


   
   ## CI report:
   
   * 90d421fc94e29c0af0fa563661b9dd8f2a388b1b Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2855)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #12507: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent

2020-06-05 Thread GitBox


flinkbot commented on pull request #12507:
URL: https://github.com/apache/flink/pull/12507#issuecomment-639959577


   
   ## CI report:
   
   * 90d421fc94e29c0af0fa563661b9dd8f2a388b1b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12303: [FLINK-17625] [table] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12303:
URL: https://github.com/apache/flink/pull/12303#issuecomment-633242378


   
   ## CI report:
   
   * 3f6b52073802835bc193796a41d095c5670f5d59 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2081)
 
   * 1c92b48de9d7b9068a646053e7dc9b4d076625ab Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2854)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on pull request #12507: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent

2020-06-05 Thread GitBox


becketqin commented on pull request #12507:
URL: https://github.com/apache/flink/pull/12507#issuecomment-639958697


   @StephanEwen Could you help take a look? Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #12507: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent

2020-06-05 Thread GitBox


flinkbot commented on pull request #12507:
URL: https://github.com/apache/flink/pull/12507#issuecomment-639957918


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 90d421fc94e29c0af0fa563661b9dd8f2a388b1b (Sat Jun 06 
02:16:13 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin opened a new pull request #12507: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent

2020-06-05 Thread GitBox


becketqin opened a new pull request #12507:
URL: https://github.com/apache/flink/pull/12507


   ## What is the purpose of the change
   The `AddSplitsEvent` contains a list of `SourceSplit` to be sent from the 
source coordinator to the source reader.  Currently the `AddSplitsEvent` is a 
serializable but the `SourceSplit` is not serializable as we expect the users 
to provide a split serializer. In order to make sure the `AddSplitsEvent` is 
serializable, we need to use the user provided split serializer to save the 
serialized splits in the `AddSplitsEvent` instead of having the split objects.
   
   ## Brief change log
   90d421fc94e29c0af0fa563661b9dd8f2a388b1b changes the `AddSplitsEvent` to 
save the serialized splits instead of the original instance.
   
   ## Verifying this change
   Existing unit tests covers the change.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12303: [FLINK-17625] [table] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12303:
URL: https://github.com/apache/flink/pull/12303#issuecomment-633242378


   
   ## CI report:
   
   * 3f6b52073802835bc193796a41d095c5670f5d59 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2081)
 
   * 1c92b48de9d7b9068a646053e7dc9b4d076625ab UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lsyldliu commented on pull request #12303: [FLINK-17625] [table] Fix ArrayIndexOutOfBoundsException in AppendOnlyTopNFunction

2020-06-05 Thread GitBox


lsyldliu commented on pull request #12303:
URL: https://github.com/apache/flink/pull/12303#issuecomment-639951595


   @wuchong Thanks for your review and I've addressed your comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] TengHu commented on pull request #12297: [FLINK-12855] [streaming-java][window-assigners] Add functionality that staggers panes on partitions to distribute workload.

2020-06-05 Thread GitBox


TengHu commented on pull request #12297:
URL: https://github.com/apache/flink/pull/12297#issuecomment-639894888


   @aljoscha Gentle ping on this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12506: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12506:
URL: https://github.com/apache/flink/pull/12506#issuecomment-639676996


   
   ## CI report:
   
   * a6e3fc1886259cb1b7efede2596c9b581df06099 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2851)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12484: [FLINK-18127][tests] Streamline manual execution of Java E2E tests

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12484:
URL: https://github.com/apache/flink/pull/12484#issuecomment-638836075


   
   ## CI report:
   
   * 48ca964fb1f2350c6335ab4726553f38a4a4c1c0 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2850)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12423:
URL: https://github.com/apache/flink/pull/12423#issuecomment-636712044


   
   ## CI report:
   
   * 979c16df9319f87d4b96ff32d7e55823d8e41551 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2849)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12375: [FLINK-17017][runtime] Implements bulk allocation for physical slots

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12375:
URL: https://github.com/apache/flink/pull/12375#issuecomment-635184560


   
   ## CI report:
   
   * cd3fc98c034fdc61235d9109c05b4f55d7423746 UNKNOWN
   * a8902557952ea70746d27e88a392c74724784605 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2848)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12460: [FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12460:
URL: https://github.com/apache/flink/pull/12460#issuecomment-638102290


   
   ## CI report:
   
   * 1d838c03d2b9f9744cae6fc03a919db72cc0efd9 UNKNOWN
   * 5729182fd15a074bebba7f878113781c2b119fd3 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2846)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12505: [FLINK-17765] Simplify Flink stack traces

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12505:
URL: https://github.com/apache/flink/pull/12505#issuecomment-639597366


   
   ## CI report:
   
   * 9e1468b9780de1ceecae32707a836f05561a14b1 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2844)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12423:
URL: https://github.com/apache/flink/pull/12423#issuecomment-636712044


   
   ## CI report:
   
   * 7a2dfb63f296d835193c2ded3ec2f1f868eaf92a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2845)
 
   * 979c16df9319f87d4b96ff32d7e55823d8e41551 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2849)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12504: [FLINK-16350] Support Zookeeper 3.5 in test_ha_per_job_cluster_datastream.sh

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12504:
URL: https://github.com/apache/flink/pull/12504#issuecomment-639551716


   
   ## CI report:
   
   * 9e7ffdf88b132d86eba7dd516c9b16d1b705758a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2842)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12503: [FLINK-18151][python] Resolve CWE22 problems in pyflink_gateway_server.py

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12503:
URL: https://github.com/apache/flink/pull/12503#issuecomment-639444578


   
   ## CI report:
   
   * ce4f8beceb4672b8daa260f1b484a5790b0ad719 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2839)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12446: [FLINK-16225] Implement user class loading exception handler

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12446:
URL: https://github.com/apache/flink/pull/12446#issuecomment-637647665


   
   ## CI report:
   
   * 426791910cdcdc50824687d842a182f4036dbfc1 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2843)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11586: [FLINK-5552][runtime] make JMXServer static per JVM

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #11586:
URL: https://github.com/apache/flink/pull/11586#issuecomment-606958289


   
   ## CI report:
   
   * 0c0af972ac3f8477eba6c65cfe990a6af0105707 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2841)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12484: [FLINK-18127][tests] Streamline manual execution of Java E2E tests

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12484:
URL: https://github.com/apache/flink/pull/12484#issuecomment-638836075


   
   ## CI report:
   
   * 683c24369ba03611b096aad5c7e9d634c131549b Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2760)
 
   * 48ca964fb1f2350c6335ab4726553f38a4a4c1c0 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2850)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12506: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12506:
URL: https://github.com/apache/flink/pull/12506#issuecomment-639676996


   
   ## CI report:
   
   * a6e3fc1886259cb1b7efede2596c9b581df06099 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2851)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (FLINK-17260) StreamingKafkaITCase failure on Azure

2020-06-05 Thread Robert Metzger (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reopened FLINK-17260:


failed again on master (including your commit): 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=2834&view=logs&s=ae4f8708-9994-57d3-c2d7-b892156e7812&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee

> StreamingKafkaITCase failure on Azure
> -
>
> Key: FLINK-17260
> URL: https://issues.apache.org/jira/browse/FLINK-17260
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Roman Khachatryan
>Assignee: Aljoscha Krettek
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>
> [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/7544/logs/165]
>  
> {code:java}
> 2020-04-16T00:12:32.2848429Z [INFO] Running 
> org.apache.flink.tests.util.kafka.StreamingKafkaITCase
> 2020-04-16T00:14:47.9100927Z [ERROR] Tests run: 3, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 135.621 s <<< FAILURE! - in 
> org.apache.flink.tests.util.k afka.StreamingKafkaITCase
> 2020-04-16T00:14:47.9103036Z [ERROR] testKafka[0: 
> kafka-version:0.10.2.0](org.apache.flink.tests.util.kafka.StreamingKafkaITCase)
>   Time elapsed: 46.222 s  <<<  FAILURE!
> 2020-04-16T00:14:47.9104033Z java.lang.AssertionError: 
> expected:<[elephant,27,64213]> but was:<[]>
> 2020-04-16T00:14:47.9104638Zat org.junit.Assert.fail(Assert.java:88)
> 2020-04-16T00:14:47.9105148Zat 
> org.junit.Assert.failNotEquals(Assert.java:834)
> 2020-04-16T00:14:47.9105701Zat 
> org.junit.Assert.assertEquals(Assert.java:118)
> 2020-04-16T00:14:47.9106239Zat 
> org.junit.Assert.assertEquals(Assert.java:144)
> 2020-04-16T00:14:47.9107177Zat 
> org.apache.flink.tests.util.kafka.StreamingKafkaITCase.testKafka(StreamingKafkaITCase.java:162)
> 2020-04-16T00:14:47.9107845Zat 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-04-16T00:14:47.9108434Zat 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-04-16T00:14:47.9109318Zat 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-04-16T00:14:47.9109914Zat 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-04-16T00:14:47.9110434Zat 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-04-16T00:14:47.9110985Zat 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-04-16T00:14:47.9111548Zat 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-04-16T00:14:47.9112083Zat 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-04-16T00:14:47.9112629Zat 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-16T00:14:47.9113145Zat 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-04-16T00:14:47.9113637Zat 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 2020-04-16T00:14:47.9114072Zat 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-04-16T00:14:47.9114490Zat 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-04-16T00:14:47.9115256Zat 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-04-16T00:14:47.9115791Zat 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-04-16T00:14:47.9116292Zat 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-04-16T00:14:47.9116736Zat 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-04-16T00:14:47.9117779Zat 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-04-16T00:14:47.9118274Zat 
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-04-16T00:14:47.9118766Zat 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-04-16T00:14:47.9119204Zat 
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2020-04-16T00:14:47.9119625Zat 
> org.junit.runners.Suite.runChild(Suite.java:128)
> 2020-04-16T00:14:47.9120005Zat 
> org.junit.runners.Suite.runChild(Suite.java:27)
> 2020-04-16T00:14:47.9120428Zat 
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-04-16T00:14:47.9120876Zat 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-04-16T00:14:47.9121350Zat 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-04-16T00:14:47.9121805Zat 
> org.junit.runners.Parent

[GitHub] [flink] flinkbot edited a comment on pull request #12484: [FLINK-18127][tests] Streamline manual execution of Java E2E tests

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12484:
URL: https://github.com/apache/flink/pull/12484#issuecomment-638836075


   
   ## CI report:
   
   * 683c24369ba03611b096aad5c7e9d634c131549b Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2760)
 
   * 48ca964fb1f2350c6335ab4726553f38a4a4c1c0 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12423:
URL: https://github.com/apache/flink/pull/12423#issuecomment-636712044


   
   ## CI report:
   
   * 362a6480b17c1133aae420e09518b08482edf7e7 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2515)
 
   * 7a2dfb63f296d835193c2ded3ec2f1f868eaf92a Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2845)
 
   * 979c16df9319f87d4b96ff32d7e55823d8e41551 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2849)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12473: [FLINK-18061] [table] TableResult#collect method should return a closeable iterator to avoid resource leak

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12473:
URL: https://github.com/apache/flink/pull/12473#issuecomment-638574481


   
   ## CI report:
   
   * 6ffb0ddcf8041c9e064e05fab3a9e1747c41820f Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2836)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #12506: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.

2020-06-05 Thread GitBox


flinkbot commented on pull request #12506:
URL: https://github.com/apache/flink/pull/12506#issuecomment-639676996


   
   ## CI report:
   
   * a6e3fc1886259cb1b7efede2596c9b581df06099 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12375: [FLINK-17017][runtime] Implements bulk allocation for physical slots

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12375:
URL: https://github.com/apache/flink/pull/12375#issuecomment-635184560


   
   ## CI report:
   
   * cd3fc98c034fdc61235d9109c05b4f55d7423746 UNKNOWN
   * ff97aeedb303fe540ebb3728ed280de414606978 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2818)
 
   * a8902557952ea70746d27e88a392c74724784605 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2848)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-16795) End to end tests timeout on Azure

2020-06-05 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126980#comment-17126980
 ] 

Robert Metzger commented on FLINK-16795:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=2825&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5

> End to end tests timeout on Azure
> -
>
> Key: FLINK-16795
> URL: https://issues.apache.org/jira/browse/FLINK-16795
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Tests
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Example: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6650&view=logs&j=08866332-78f7-59e4-4f7e-49a56faa3179
>  or 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6637&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5
> {code}##[error]The job running on agent Azure Pipelines 6 ran longer than the 
> maximum time of 200 minutes. For more information, see 
> https://go.microsoft.com/fwlink/?linkid=2077134
> {code}
> and {code}##[error]The operation was canceled.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #12453: [FLINK-18061] [table] TableResult#collect method should return a closeable iterator to avoid resource leak

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12453:
URL: https://github.com/apache/flink/pull/12453#issuecomment-637941328


   
   ## CI report:
   
   * ae99323647e5a2febfd0696738b53b540b72f3a2 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2835)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #12506: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.

2020-06-05 Thread GitBox


flinkbot commented on pull request #12506:
URL: https://github.com/apache/flink/pull/12506#issuecomment-639664025


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit a6e3fc1886259cb1b7efede2596c9b581df06099 (Fri Jun 05 
17:48:33 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kl0u commented on a change in pull request #12501: [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map

2020-06-05 Thread GitBox


kl0u commented on a change in pull request #12501:
URL: https://github.com/apache/flink/pull/12501#discussion_r436070973



##
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
##
@@ -148,6 +149,7 @@ private Pod decoratePod(Pod pod) {
 
// remove kubernetes.config.file

propertiesMap.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key());
+   propertiesMap.remove(DeploymentOptionsInternal.CONF_DIR.key());

Review comment:
   I think it would be safer to use `flinkConfig.removeConfig()` which also 
removes fallback keys.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] AHeise opened a new pull request #12506: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.

2020-06-05 Thread GitBox


AHeise opened a new pull request #12506:
URL: https://github.com/apache/flink/pull/12506


   
   
   
   [unchanged 1.11 backport of https://github.com/apache/flink/pull/12493]
   
   ## What is the purpose of the change
   
   `CheckpointBarrierUnaligner#hasInflightData` was not called with input gate 
contextual information, such that only the same first few channels are checked 
during initial snapshotting of inflight data for multi-gate setups.
   
   ## Brief change log
   
   - Using `InputChannelInfo` in `hasInflightData` to get the flattened index
   
   ## Verifying this change
   
   - Added `StreamTaskNetworkInputTest#testSnapshotWithTwoInputGates`.
   - Various ITCases with sporadic data losses.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / 
don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on a change in pull request #12484: [FLINK-18127][tests] Streamline manual execution of Java E2E tests

2020-06-05 Thread GitBox


zentol commented on a change in pull request #12484:
URL: https://github.com/apache/flink/pull/12484#discussion_r436067756



##
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResourceFactory.java
##
@@ -33,19 +37,83 @@
 public final class LocalStandaloneFlinkResourceFactory implements 
FlinkResourceFactory {
private static final Logger LOG = 
LoggerFactory.getLogger(LocalStandaloneFlinkResourceFactory.class);
 
+   private static final ParameterProperty PROJECT_ROOT_DIRECTORY = 
new ParameterProperty<>("rootDir", Paths::get);
private static final ParameterProperty DISTRIBUTION_DIRECTORY = 
new ParameterProperty<>("distDir", Paths::get);
private static final ParameterProperty 
DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", 
Paths::get);
 
@Override
public FlinkResource create(FlinkResourceSetup setup) {
Optional distributionDirectory = 
DISTRIBUTION_DIRECTORY.get();
if (!distributionDirectory.isPresent()) {
-   throw new IllegalArgumentException("The distDir 
property was not set. You can set it when running maven via -DdistDir= 
.");
+   // distDir was not explicitly set, let's search for it
+
+   Path projectRootPath;
+   Optional projectRoot = 
PROJECT_ROOT_DIRECTORY.get();
+   if (projectRoot.isPresent()) {
+   // running with maven
+   projectRootPath = projectRoot.get();
+   } else {
+   // running in the IDE; working directory is 
test module
+   Optional projectRootDirectory = 
findProjectRootDirectory(Paths.get("").toAbsolutePath());
+   // this distinction is required in case this 
class is used outside of Flink
+   if (projectRootDirectory.isPresent()) {
+   projectRootPath = 
projectRootDirectory.get();
+   } else {
+   throw new IllegalArgumentException(
+   "The 'distDir' property was not 
set and the flink-dist module could not be found automatically." +
+   " Please point the 
'distDir' property to the directory containing distribution; you can set it 
when running maven via -DdistDir= .");
+   }
+   }
+   Optional distribution = 
findDistribution(projectRootPath);
+   if (!distribution.isPresent()) {
+   throw new IllegalArgumentException(
+   "The 'distDir' property was not set and 
a distribution could not be found automatically." +
+   " Please point the 'distDir' 
property to the directory containing distribution; you can set it when running 
maven via -DdistDir= .");
+   } else {
+   distributionDirectory = distribution;
+   }
}
Optional logBackupDirectory = 
DISTRIBUTION_LOG_BACKUP_DIRECTORY.get();
if (!logBackupDirectory.isPresent()) {
LOG.warn("Property {} not set, logs will not be backed 
up in case of test failures.", 
DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName());
}
return new 
LocalStandaloneFlinkResource(distributionDirectory.get(), 
logBackupDirectory.orElse(null), setup);
}
+
+   public static void main(String[] args) {
+   
System.out.println(findProjectRootDirectory(Paths.get("").toAbsolutePath()));
+   }

Review comment:
   whoops,





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zentol commented on a change in pull request #12484: [FLINK-18127][tests] Streamline manual execution of Java E2E tests

2020-06-05 Thread GitBox


zentol commented on a change in pull request #12484:
URL: https://github.com/apache/flink/pull/12484#discussion_r436067636



##
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResourceFactory.java
##
@@ -33,19 +37,83 @@
 public final class LocalStandaloneFlinkResourceFactory implements 
FlinkResourceFactory {
private static final Logger LOG = 
LoggerFactory.getLogger(LocalStandaloneFlinkResourceFactory.class);
 
+   private static final ParameterProperty PROJECT_ROOT_DIRECTORY = 
new ParameterProperty<>("rootDir", Paths::get);
private static final ParameterProperty DISTRIBUTION_DIRECTORY = 
new ParameterProperty<>("distDir", Paths::get);
private static final ParameterProperty 
DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", 
Paths::get);
 
@Override
public FlinkResource create(FlinkResourceSetup setup) {
Optional distributionDirectory = 
DISTRIBUTION_DIRECTORY.get();
if (!distributionDirectory.isPresent()) {
-   throw new IllegalArgumentException("The distDir 
property was not set. You can set it when running maven via -DdistDir= 
.");
+   // distDir was not explicitly set, let's search for it
+
+   Path projectRootPath;
+   Optional projectRoot = 
PROJECT_ROOT_DIRECTORY.get();
+   if (projectRoot.isPresent()) {
+   // running with maven
+   projectRootPath = projectRoot.get();
+   } else {
+   // running in the IDE; working directory is 
test module
+   Optional projectRootDirectory = 
findProjectRootDirectory(Paths.get("").toAbsolutePath());
+   // this distinction is required in case this 
class is used outside of Flink
+   if (projectRootDirectory.isPresent()) {
+   projectRootPath = 
projectRootDirectory.get();
+   } else {
+   throw new IllegalArgumentException(
+   "The 'distDir' property was not 
set and the flink-dist module could not be found automatically." +
+   " Please point the 
'distDir' property to the directory containing distribution; you can set it 
when running maven via -DdistDir= .");
+   }
+   }
+   Optional distribution = 
findDistribution(projectRootPath);
+   if (!distribution.isPresent()) {
+   throw new IllegalArgumentException(
+   "The 'distDir' property was not set and 
a distribution could not be found automatically." +
+   " Please point the 'distDir' 
property to the directory containing distribution; you can set it when running 
maven via -DdistDir= .");
+   } else {
+   distributionDirectory = distribution;
+   }
}
Optional logBackupDirectory = 
DISTRIBUTION_LOG_BACKUP_DIRECTORY.get();
if (!logBackupDirectory.isPresent()) {
LOG.warn("Property {} not set, logs will not be backed 
up in case of test failures.", 
DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName());
}
return new 
LocalStandaloneFlinkResource(distributionDirectory.get(), 
logBackupDirectory.orElse(null), setup);
}
+
+   public static void main(String[] args) {
+   
System.out.println(findProjectRootDirectory(Paths.get("").toAbsolutePath()));
+   }
+
+   private static Optional findProjectRootDirectory(Path 
currentDirectory) {
+   // move up the module structure until we find flink-dist; 
relies on all modules being prefixed with 'flink'
+   do {
+   if 
(Files.exists(currentDirectory.resolve("flink-dist"))) {
+   return Optional.of(currentDirectory);
+   }
+   currentDirectory = currentDirectory.getParent();
+   }  while 
(currentDirectory.getFileName().toString().startsWith("flink"));
+   return Optional.empty();
+   }
+
+   private static Optional findDistribution(Path 
projectRootDirectory) {
+   final Path distTargetDirectory = 
projectRootDirectory.resolve("flink-dist").resolve("target");
+   try {
+   Collection paths = 
FileUtils.listFilesInDirectory(distTargetDirectory, p -> 
p.getFileName().toString().contains("flink-dist"));
+   

[GitHub] [flink] zhuzhurk commented on a change in pull request #12256: [FLINK-17018][runtime] Allocates slots in bulks for pipelined region scheduling

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r436060993



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
##
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This slot allocator will request one physical slot for each single 
execution vertex.
+ * The slots will be requested in bulks so that the {@link SlotProvider} can 
check
+ * whether this bulk of slot requests can be fulfilled at the same time.
+ * It has several limitations:
+ *
+ * 1. Slot sharing will be ignored.
+ *
+ * 2. Co-location constraints are not allowed.
+ *
+ * 3. Intra-bulk input location preferences will be ignored.
+ */
+public class OneSlotPerExecutionSlotAllocator extends 
AbstractExecutionSlotAllocator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class);
+
+   private final SlotProvider slotProvider;
+
+   private final SlotOwner slotOwner;
+
+   private final boolean slotWillBeOccupiedIndefinitely;
+
+   private final Time allocationTimeout;
+
+   public OneSlotPerExecutionSlotAllocator(
+   final SlotProvider slotProvider,
+   final InputsLocationsRetriever inputsLocationsRetriever,
+   final boolean slotWillBeOccupiedIndefinitely,
+   final Time allocationTimeout) {
+
+   super(inputsLocationsRetriever);
+
+   this.slotProvider = checkNotNull(slotProvider);
+   this.slotWillBeOccupiedIndefinitely = 
slotWillBeOccupiedIndefinitely;
+   this.allocationTimeout = checkNotNull(allocationTimeout);
+
+   this.slotOwner = new 
OneSlotPerExecutionSlotAllocatorSlotOwner();
+   }
+
+   @Override
+   public List allocateSlotsFor(
+   final List 
executionVertexSchedulingRequirements) {
+
+   
validateSchedulingRequirements(executionVertexSchedulingRequirements);
+
+   
validateNoCoLocationConstraint(executionVertexSchedulingRequirements);
+
+   final Set allExecutionVertexIds = 
executionVertexSchedulingRequirements.stream()
+   
.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
+   .collect(Collectors.toSet());
+
+   final Map 
executionVertexSlotRequestIds =
+   
generateExecutionVertexSlotRequestIds(allExecutionVertexIds);
+
+   final List 
slotExecutionVertexAssignments =
+   
createSlotExecutionVertexAssignments(executionVertexSchedulingReq

[GitHub] [flink] flinkbot edited a comment on pull request #12460: [FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12460:
URL: https://github.com/apache/flink/pull/12460#issuecomment-638102290


   
   ## CI report:
   
   * 1d838c03d2b9f9744cae6fc03a919db72cc0efd9 UNKNOWN
   * 31da2083df6221fd36c6ed5674927eed1cc68088 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2828)
 
   * 5729182fd15a074bebba7f878113781c2b119fd3 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2846)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #12460: [FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner

2020-06-05 Thread GitBox


zhijiangW commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r436055041



##
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##
@@ -512,6 +512,123 @@ public void 
testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except
}
}
 
+   /**
+* Tests {@link 
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+* abort the current pending checkpoint triggered by
+* {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, 
InputChannelInfo)}.
+*/
+   @Test
+   public void testProcessCancellationBarrierWitchPendingCheckpoint() 
throws Exception {
+   final long checkpointId = 0L;
+   final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+   final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(
+   new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+
+   ThreadSafeUnaligner unaligner = 
handler.getThreadSafeUnaligner();
+   // should trigger respective checkpoint
+   
unaligner.notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new 
InputChannelInfo(0, 0));
+
+   assertFalse(handler.isCheckpointPending());
+   assertTrue(unaligner.isCheckpointPending());
+   assertEquals(-1L, handler.getLatestCheckpointId());
+   assertEquals(checkpointId, unaligner.getCurrentCheckpointId());
+
+   testProcessCancellationBarrier(handler, invokable, 
checkpointId);
+   }
+
+   /**
+* Tests {@link 
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+* abort the current pending checkpoint triggered by
+* {@link CheckpointBarrierUnaligner#processBarrier(CheckpointBarrier, 
int)}.
+*/
+   @Test
+   public void testProcessCancellationBarrierWitchPendingCheckpoint2() 
throws Exception {
+   final long checkpointId = 0L;
+   final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+   final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(
+   new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+
+   // should trigger respective checkpoint
+   handler.processBarrier(buildCheckpointBarrier(checkpointId), 0);
+
+   assertTrue(handler.isCheckpointPending());
+   
assertTrue(handler.getThreadSafeUnaligner().isCheckpointPending());
+   assertEquals(checkpointId, handler.getLatestCheckpointId());
+   assertEquals(checkpointId, 
handler.getThreadSafeUnaligner().getCurrentCheckpointId());
+
+   testProcessCancellationBarrier(handler, invokable, 
checkpointId);
+   }
+
+   @Test
+   public void 
testProcessCancellationBarrierBeforeProcessAndReceiveBarrier() throws Exception 
{
+   final long checkpointId = 0L;
+   final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+   final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(
+   new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+
+   handler.processCancellationBarrier(new 
CancelCheckpointMarker(checkpointId));
+
+   verifyTriggeredCheckpoint(handler, invokable, checkpointId);
+
+   // it would not trigger checkpoint since the respective 
cancellation barrier already happened before
+   handler.processBarrier(buildCheckpointBarrier(checkpointId), 0);
+   
handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(checkpointId),
 new InputChannelInfo(0, 0));
+
+   verifyTriggeredCheckpoint(handler, invokable, checkpointId);
+   }
+
+   private void testProcessCancellationBarrier(
+   CheckpointBarrierUnaligner handler,
+   ValidatingCheckpointInvokable invokable,
+   long currentCheckpointId) throws Exception {
+
+   // should abort current checkpoint while processing 
CancelCheckpointMarker
+   handler.processCancellationBarrier(new 
CancelCheckpointMarker(currentCheckpointId));
+   verifyTriggeredCheckpoint(handler, invokable, 
currentCheckpointId);
+
+   final long canceledCheckpointId = 1L;
+   // should update current checkpoint id and abort notification 
while processing CancelCheckpointMarker
+   handler.processCancellationBarrier(new 
CancelCheckpointMarker(canceledCheckpointId));
+   verifyTriggeredCheckpoint(handler, invokable, 
canceledCheckpointId);

Review comment:

[GitHub] [flink] zhijiangW commented on a change in pull request #12460: [FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner

2020-06-05 Thread GitBox


zhijiangW commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r436055041



##
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##
@@ -512,6 +512,123 @@ public void 
testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except
}
}
 
+   /**
+* Tests {@link 
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+* abort the current pending checkpoint triggered by
+* {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, 
InputChannelInfo)}.
+*/
+   @Test
+   public void testProcessCancellationBarrierWitchPendingCheckpoint() 
throws Exception {
+   final long checkpointId = 0L;
+   final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+   final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(
+   new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+
+   ThreadSafeUnaligner unaligner = 
handler.getThreadSafeUnaligner();
+   // should trigger respective checkpoint
+   
unaligner.notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new 
InputChannelInfo(0, 0));
+
+   assertFalse(handler.isCheckpointPending());
+   assertTrue(unaligner.isCheckpointPending());
+   assertEquals(-1L, handler.getLatestCheckpointId());
+   assertEquals(checkpointId, unaligner.getCurrentCheckpointId());
+
+   testProcessCancellationBarrier(handler, invokable, 
checkpointId);
+   }
+
+   /**
+* Tests {@link 
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+* abort the current pending checkpoint triggered by
+* {@link CheckpointBarrierUnaligner#processBarrier(CheckpointBarrier, 
int)}.
+*/
+   @Test
+   public void testProcessCancellationBarrierWitchPendingCheckpoint2() 
throws Exception {
+   final long checkpointId = 0L;
+   final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+   final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(
+   new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+
+   // should trigger respective checkpoint
+   handler.processBarrier(buildCheckpointBarrier(checkpointId), 0);
+
+   assertTrue(handler.isCheckpointPending());
+   
assertTrue(handler.getThreadSafeUnaligner().isCheckpointPending());
+   assertEquals(checkpointId, handler.getLatestCheckpointId());
+   assertEquals(checkpointId, 
handler.getThreadSafeUnaligner().getCurrentCheckpointId());
+
+   testProcessCancellationBarrier(handler, invokable, 
checkpointId);
+   }
+
+   @Test
+   public void 
testProcessCancellationBarrierBeforeProcessAndReceiveBarrier() throws Exception 
{
+   final long checkpointId = 0L;
+   final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+   final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(
+   new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+
+   handler.processCancellationBarrier(new 
CancelCheckpointMarker(checkpointId));
+
+   verifyTriggeredCheckpoint(handler, invokable, checkpointId);
+
+   // it would not trigger checkpoint since the respective 
cancellation barrier already happened before
+   handler.processBarrier(buildCheckpointBarrier(checkpointId), 0);
+   
handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(checkpointId),
 new InputChannelInfo(0, 0));
+
+   verifyTriggeredCheckpoint(handler, invokable, checkpointId);
+   }
+
+   private void testProcessCancellationBarrier(
+   CheckpointBarrierUnaligner handler,
+   ValidatingCheckpointInvokable invokable,
+   long currentCheckpointId) throws Exception {
+
+   // should abort current checkpoint while processing 
CancelCheckpointMarker
+   handler.processCancellationBarrier(new 
CancelCheckpointMarker(currentCheckpointId));
+   verifyTriggeredCheckpoint(handler, invokable, 
currentCheckpointId);
+
+   final long canceledCheckpointId = 1L;
+   // should update current checkpoint id and abort notification 
while processing CancelCheckpointMarker
+   handler.processCancellationBarrier(new 
CancelCheckpointMarker(canceledCheckpointId));
+   verifyTriggeredCheckpoint(handler, invokable, 
canceledCheckpointId);

Review comment:

[GitHub] [flink] zhijiangW merged pull request #12493: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.

2020-06-05 Thread GitBox


zhijiangW merged pull request #12493:
URL: https://github.com/apache/flink/pull/12493


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12503: [FLINK-18151][python] Resolve CWE22 problems in pyflink_gateway_server.py

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12503:
URL: https://github.com/apache/flink/pull/12503#issuecomment-639444578


   
   ## CI report:
   
   * da2d996d1c4170fa70772715ca027cdc7d65407a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2830)
 
   * ce4f8beceb4672b8daa260f1b484a5790b0ad719 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2839)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12423:
URL: https://github.com/apache/flink/pull/12423#issuecomment-636712044


   
   ## CI report:
   
   * 362a6480b17c1133aae420e09518b08482edf7e7 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2515)
 
   * 7a2dfb63f296d835193c2ded3ec2f1f868eaf92a Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2845)
 
   * 979c16df9319f87d4b96ff32d7e55823d8e41551 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12460: [FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12460:
URL: https://github.com/apache/flink/pull/12460#issuecomment-638102290


   
   ## CI report:
   
   * 1d838c03d2b9f9744cae6fc03a919db72cc0efd9 UNKNOWN
   * b2c4d6aaf78612e7214cc49d22e2b90fe29b93b6 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2686)
 
   * 31da2083df6221fd36c6ed5674927eed1cc68088 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2828)
 
   * 5729182fd15a074bebba7f878113781c2b119fd3 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2846)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12493: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12493:
URL: https://github.com/apache/flink/pull/12493#issuecomment-639156189


   
   ## CI report:
   
   * 1973dc85708e04da3b66b511eb89c84478087dd5 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2829)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12375: [FLINK-17017][runtime] Implements bulk allocation for physical slots

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12375:
URL: https://github.com/apache/flink/pull/12375#issuecomment-635184560


   
   ## CI report:
   
   * cd3fc98c034fdc61235d9109c05b4f55d7423746 UNKNOWN
   * ff97aeedb303fe540ebb3728ed280de414606978 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2818)
 
   * a8902557952ea70746d27e88a392c74724784605 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-18164) null <> 'str' should be true

2020-06-05 Thread Benchao Li (Jira)
Benchao Li created FLINK-18164:
--

 Summary: null <> 'str' should be true
 Key: FLINK-18164
 URL: https://issues.apache.org/jira/browse/FLINK-18164
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Benchao Li


Currently, if we compare null with other literals, the result will always be 
false.
It's because the code gen always gives a default value (false) for the result. 
And I think it's a bug if `null <> 'str'` is false.

It's reported from user-zh: 
http://apache-flink.147419.n8.nabble.com/flink-sql-null-false-td3640.html

CC [~jark] [~ykt836]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17560) No Slots available exception in Apache Flink Job Manager while Scheduling

2020-06-05 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126933#comment-17126933
 ] 

Chesnay Schepler commented on FLINK-17560:
--

There's a ConcurrentModificationException in the TM logs when the slots are 
being offered. If this is a bug in the slot allocation protocol then the only 
option I see is to try a later Flink version.

Are the running a customized Flink version?

> No Slots available exception in Apache Flink Job Manager while Scheduling
> -
>
> Key: FLINK-17560
> URL: https://issues.apache.org/jira/browse/FLINK-17560
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.3
> Environment: Flink verson 1.8.3
> Session cluster
>Reporter: josson paul kalapparambath
>Priority: Major
> Attachments: jobmgr.log, threaddump-tm.txt, tm.log
>
>
> Set up
> --
> Flink verson 1.8.3
> Zookeeper HA cluster
> 1 ResourceManager/Dispatcher (Same Node)
> 1 TaskManager
> 4 pipelines running with various parallelism's
> Issue
> --
> Occationally when the Job Manager gets restarted we noticed that all the 
> pipelines are not getting scheduled. The error that is reporeted by the Job 
> Manger is 'not enough slots are available'. This should not be the case 
> because task manager was deployed with sufficient slots for the number of 
> pipelines/parallelism we have.
> We further noticed that the slot report sent by the taskmanger contains solts 
> filled with old CANCELLED job Ids. I am not sure why the task manager still 
> holds the details of the old jobs. Thread dump on the task manager confirms 
> that old pipelines are not running.
> I am aware of https://issues.apache.org/jira/browse/FLINK-12865. But this is 
> not the issue happening in this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #12505: [FLINK-17765] Simplify Flink stack traces

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12505:
URL: https://github.com/apache/flink/pull/12505#issuecomment-639597366


   
   ## CI report:
   
   * 9e1468b9780de1ceecae32707a836f05561a14b1 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2844)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12471: [FLINK-18073][FLINK-18029][avro] Fix AvroRowDataSerializationSchema is not serializable and add IT cases

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12471:
URL: https://github.com/apache/flink/pull/12471#issuecomment-638362836


   
   ## CI report:
   
   * 4bcc31b01d257710de3d5828167f97e0cd00ff03 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2826)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12423:
URL: https://github.com/apache/flink/pull/12423#issuecomment-636712044


   
   ## CI report:
   
   * 362a6480b17c1133aae420e09518b08482edf7e7 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2515)
 
   * 7a2dfb63f296d835193c2ded3ec2f1f868eaf92a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12423:
URL: https://github.com/apache/flink/pull/12423#discussion_r436028808



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base class for all {@link ExecutionSlotAllocator}. It is responsible to 
allocate slots for tasks and
+ * keep the unfulfilled slot requests for further cancellation.
+ */
+public abstract class AbstractExecutionSlotAllocator implements 
ExecutionSlotAllocator {
+
+   /**
+* Store the uncompleted slot assignments.
+*/
+   protected final Map 
pendingSlotAssignments;
+
+   protected final PreferredLocationsRetriever preferredLocationsRetriever;
+
+   public AbstractExecutionSlotAllocator(final PreferredLocationsRetriever 
preferredLocationsRetriever) {
+   this.preferredLocationsRetriever = 
checkNotNull(preferredLocationsRetriever);
+   this.pendingSlotAssignments = new HashMap<>();
+   }
+
+   @Override
+   public void cancel(final ExecutionVertexID executionVertexId) {
+   final SlotExecutionVertexAssignment 
slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId);
+   if (slotExecutionVertexAssignment != null) {
+   
slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false);
+   }
+   }
+
+   @Override
+   public CompletableFuture stop() {
+   final List executionVertexIds = new 
ArrayList<>(pendingSlotAssignments.keySet());
+   executionVertexIds.forEach(this::cancel);
+

Review comment:
   I will make the change "getNumberOfPendingSlotAssignments -> 
getPendingSlotAssignments" in #12256 when introducing 
`AbstractExecutionSlotAllocator`.
   It would be good for verifying the existence of a given assignment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12423:
URL: https://github.com/apache/flink/pull/12423#discussion_r436024342



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base class for all {@link ExecutionSlotAllocator}. It is responsible to 
allocate slots for tasks and
+ * keep the unfulfilled slot requests for further cancellation.
+ */
+public abstract class AbstractExecutionSlotAllocator implements 
ExecutionSlotAllocator {
+
+   /**
+* Store the uncompleted slot assignments.
+*/
+   protected final Map 
pendingSlotAssignments;
+
+   protected final PreferredLocationsRetriever preferredLocationsRetriever;
+
+   public AbstractExecutionSlotAllocator(final PreferredLocationsRetriever 
preferredLocationsRetriever) {
+   this.preferredLocationsRetriever = 
checkNotNull(preferredLocationsRetriever);
+   this.pendingSlotAssignments = new HashMap<>();
+   }
+
+   @Override
+   public void cancel(final ExecutionVertexID executionVertexId) {
+   final SlotExecutionVertexAssignment 
slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId);
+   if (slotExecutionVertexAssignment != null) {
+   
slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false);
+   }
+   }
+
+   @Override
+   public CompletableFuture stop() {
+   final List executionVertexIds = new 
ArrayList<>(pendingSlotAssignments.keySet());
+   executionVertexIds.forEach(this::cancel);
+

Review comment:
   `ExecutionSlotAllocator#stop()` is removed via hotfix 
7a2dfb63f296d835193c2ded3ec2f1f868eaf92a





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-17973) Test memory configuration of Flink cluster

2020-06-05 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-17973:


Assignee: Chesnay Schepler

> Test memory configuration of Flink cluster
> --
>
> Key: FLINK-17973
> URL: https://issues.apache.org/jira/browse/FLINK-17973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Tests
>Affects Versions: 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: release-testing
> Fix For: 1.11.0
>
>
> Make sure that Flink processes (in particular Master processes) fail with a 
> meaningful exception message if they exceed the configured memory budgets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18163) Should be volatile: network.api.writer.RecordWriter.flusherException

2020-06-05 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-18163:
-

 Summary: Should be volatile: 
network.api.writer.RecordWriter.flusherException 
 Key: FLINK-18163
 URL: https://issues.apache.org/jira/browse/FLINK-18163
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.11.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.12.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhuzhurk commented on a change in pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12423:
URL: https://github.com/apache/flink/pull/12423#discussion_r436018689



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionVertex.MAX_DISTINCT_LOCATIONS_TO_CONSIDER;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default implementation of {@link PreferredLocationsRetriever}.
+ * Locations based on state will be returned if exist.
+ * Otherwise locations based on inputs will be returned.
+ */
+public class DefaultPreferredLocationsRetriever implements 
PreferredLocationsRetriever {
+
+   private final StateLocationRetriever stateLocationRetriever;
+
+   private final InputsLocationsRetriever inputsLocationsRetriever;
+
+   DefaultPreferredLocationsRetriever(
+   final StateLocationRetriever stateLocationRetriever,
+   final InputsLocationsRetriever 
inputsLocationsRetriever) {
+
+   this.stateLocationRetriever = 
checkNotNull(stateLocationRetriever);
+   this.inputsLocationsRetriever = 
checkNotNull(inputsLocationsRetriever);
+   }
+
+   @Override
+   public CompletableFuture> 
getPreferredLocations(
+   final ExecutionVertexID executionVertexId,
+   final Set producersToIgnore) {
+
+   checkNotNull(executionVertexId);
+   checkNotNull(producersToIgnore);
+
+   final Collection 
preferredLocationsBasedOnState =
+   getPreferredLocationsBasedOnState(executionVertexId, 
stateLocationRetriever);
+   if (!preferredLocationsBasedOnState.isEmpty()) {
+   return 
CompletableFuture.completedFuture(preferredLocationsBasedOnState);
+   }
+
+   return getPreferredLocationsBasedOnInputs(executionVertexId, 
producersToIgnore, inputsLocationsRetriever);
+   }
+
+   private static Collection 
getPreferredLocationsBasedOnState(
+   final ExecutionVertexID executionVertexId,
+   final StateLocationRetriever stateLocationRetriever) {
+
+   return 
stateLocationRetriever.getStateLocation(executionVertexId)
+   .map(Collections::singleton)
+   .orElse(Collections.emptySet());
+   }
+
+   /**
+* Gets the location preferences of the execution, as determined by the 
locations
+* of the predecessors from which it receives input data.
+* If there are more than {@link 
ExecutionVertex#MAX_DISTINCT_LOCATIONS_TO_CONSIDER} different locations of 
source data,
+* or neither the sources have not been started nor will be started 
with the execution together,
+* this method returns an empty collection to indicate no location 
preference.
+*
+* @return The preferred locations based in input streams, or an empty 
iterable,
+* if there is no input-based preference.
+*/
+   static CompletableFuture> 
getPreferredLocationsBasedOnInputs(

Review comment:
   done.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/PreferredLocationsRetriever.java
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF lice

[GitHub] [flink] zhuzhurk commented on a change in pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12423:
URL: https://github.com/apache/flink/pull/12423#discussion_r436018529



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionVertex.MAX_DISTINCT_LOCATIONS_TO_CONSIDER;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default implementation of {@link PreferredLocationsRetriever}.
+ * Locations based on state will be returned if exist.
+ * Otherwise locations based on inputs will be returned.
+ */
+public class DefaultPreferredLocationsRetriever implements 
PreferredLocationsRetriever {
+
+   private final StateLocationRetriever stateLocationRetriever;
+
+   private final InputsLocationsRetriever inputsLocationsRetriever;
+
+   DefaultPreferredLocationsRetriever(
+   final StateLocationRetriever stateLocationRetriever,
+   final InputsLocationsRetriever 
inputsLocationsRetriever) {
+
+   this.stateLocationRetriever = 
checkNotNull(stateLocationRetriever);
+   this.inputsLocationsRetriever = 
checkNotNull(inputsLocationsRetriever);
+   }
+
+   @Override
+   public CompletableFuture> 
getPreferredLocations(
+   final ExecutionVertexID executionVertexId,
+   final Set producersToIgnore) {
+
+   checkNotNull(executionVertexId);
+   checkNotNull(producersToIgnore);
+
+   final Collection 
preferredLocationsBasedOnState =
+   getPreferredLocationsBasedOnState(executionVertexId, 
stateLocationRetriever);
+   if (!preferredLocationsBasedOnState.isEmpty()) {
+   return 
CompletableFuture.completedFuture(preferredLocationsBasedOnState);
+   }
+
+   return getPreferredLocationsBasedOnInputs(executionVertexId, 
producersToIgnore, inputsLocationsRetriever);
+   }
+
+   private static Collection 
getPreferredLocationsBasedOnState(
+   final ExecutionVertexID executionVertexId,
+   final StateLocationRetriever stateLocationRetriever) {
+
+   return 
stateLocationRetriever.getStateLocation(executionVertexId)
+   .map(Collections::singleton)
+   .orElse(Collections.emptySet());
+   }
+
+   /**
+* Gets the location preferences of the execution, as determined by the 
locations
+* of the predecessors from which it receives input data.
+* If there are more than {@link 
ExecutionVertex#MAX_DISTINCT_LOCATIONS_TO_CONSIDER} different locations of 
source data,
+* or neither the sources have not been started nor will be started 
with the execution together,
+* this method returns an empty collection to indicate no location 
preference.
+*
+* @return The preferred locations based in input streams, or an empty 
iterable,
+* if there is no input-based preference.
+*/
+   static CompletableFuture> 
getPreferredLocationsBasedOnInputs(
+   final ExecutionVertexID executionVertexId,
+   final Set producersToIgnore,
+   final InputsLocationsRetriever 
inputsLocationsRetriever) {
+
+   CompletableFuture> 
preferredLocations =
+   
CompletableFuture.completedFuture(Collections.emptyList());
+
+   final Collection> 
locationsFutures = new ArrayLis

[GitHub] [flink] tillrohrmann commented on a change in pull request #12484: [FLINK-18127][tests] Streamline manual execution of Java E2E tests

2020-06-05 Thread GitBox


tillrohrmann commented on a change in pull request #12484:
URL: https://github.com/apache/flink/pull/12484#discussion_r436017785



##
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResourceFactory.java
##
@@ -33,19 +37,83 @@
 public final class LocalStandaloneFlinkResourceFactory implements 
FlinkResourceFactory {
private static final Logger LOG = 
LoggerFactory.getLogger(LocalStandaloneFlinkResourceFactory.class);
 
+   private static final ParameterProperty PROJECT_ROOT_DIRECTORY = 
new ParameterProperty<>("rootDir", Paths::get);
private static final ParameterProperty DISTRIBUTION_DIRECTORY = 
new ParameterProperty<>("distDir", Paths::get);
private static final ParameterProperty 
DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", 
Paths::get);
 
@Override
public FlinkResource create(FlinkResourceSetup setup) {
Optional distributionDirectory = 
DISTRIBUTION_DIRECTORY.get();
if (!distributionDirectory.isPresent()) {
-   throw new IllegalArgumentException("The distDir 
property was not set. You can set it when running maven via -DdistDir= 
.");
+   // distDir was not explicitly set, let's search for it
+
+   Path projectRootPath;
+   Optional projectRoot = 
PROJECT_ROOT_DIRECTORY.get();
+   if (projectRoot.isPresent()) {
+   // running with maven
+   projectRootPath = projectRoot.get();
+   } else {
+   // running in the IDE; working directory is 
test module
+   Optional projectRootDirectory = 
findProjectRootDirectory(Paths.get("").toAbsolutePath());
+   // this distinction is required in case this 
class is used outside of Flink
+   if (projectRootDirectory.isPresent()) {
+   projectRootPath = 
projectRootDirectory.get();
+   } else {
+   throw new IllegalArgumentException(
+   "The 'distDir' property was not 
set and the flink-dist module could not be found automatically." +
+   " Please point the 
'distDir' property to the directory containing distribution; you can set it 
when running maven via -DdistDir= .");
+   }
+   }
+   Optional distribution = 
findDistribution(projectRootPath);
+   if (!distribution.isPresent()) {
+   throw new IllegalArgumentException(
+   "The 'distDir' property was not set and 
a distribution could not be found automatically." +
+   " Please point the 'distDir' 
property to the directory containing distribution; you can set it when running 
maven via -DdistDir= .");
+   } else {
+   distributionDirectory = distribution;
+   }
}
Optional logBackupDirectory = 
DISTRIBUTION_LOG_BACKUP_DIRECTORY.get();
if (!logBackupDirectory.isPresent()) {
LOG.warn("Property {} not set, logs will not be backed 
up in case of test failures.", 
DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName());
}
return new 
LocalStandaloneFlinkResource(distributionDirectory.get(), 
logBackupDirectory.orElse(null), setup);
}
+
+   public static void main(String[] args) {
+   
System.out.println(findProjectRootDirectory(Paths.get("").toAbsolutePath()));
+   }
+
+   private static Optional findProjectRootDirectory(Path 
currentDirectory) {
+   // move up the module structure until we find flink-dist; 
relies on all modules being prefixed with 'flink'
+   do {
+   if 
(Files.exists(currentDirectory.resolve("flink-dist"))) {
+   return Optional.of(currentDirectory);
+   }
+   currentDirectory = currentDirectory.getParent();
+   }  while 
(currentDirectory.getFileName().toString().startsWith("flink"));
+   return Optional.empty();
+   }
+
+   private static Optional findDistribution(Path 
projectRootDirectory) {
+   final Path distTargetDirectory = 
projectRootDirectory.resolve("flink-dist").resolve("target");
+   try {
+   Collection paths = 
FileUtils.listFilesInDirectory(distTargetDirectory, p -> 
p.getFileName().toString().contains("flink-dist"));
+ 

[GitHub] [flink] zhuzhurk commented on a change in pull request #12423: [FLINK-18034][runtime] Introduce PreferredLocationsRetriever

2020-06-05 Thread GitBox


zhuzhurk commented on a change in pull request #12423:
URL: https://github.com/apache/flink/pull/12423#discussion_r436018318



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirements.java
##
@@ -169,8 +161,7 @@ public ExecutionVertexSchedulingRequirements build() {
taskResourceProfile,
physicalSlotResourceProfile,
slotSharingGroupId,
-   coLocationConstraint,
-   preferredLocations);
+   coLocationConstraint);

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #12505: [FLINK-17765] Simplify Flink stack traces

2020-06-05 Thread GitBox


flinkbot commented on pull request #12505:
URL: https://github.com/apache/flink/pull/12505#issuecomment-639597366


   
   ## CI report:
   
   * 9e1468b9780de1ceecae32707a836f05561a14b1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12446: [FLINK-16225] Implement user class loading exception handler

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12446:
URL: https://github.com/apache/flink/pull/12446#issuecomment-637647665


   
   ## CI report:
   
   * 7af38d20bc7c7e484cd63b36ec2d603f14a99dfc Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2840)
 
   * 426791910cdcdc50824687d842a182f4036dbfc1 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2843)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12460: [FLINK-18063][checkpointing] Fix the race condition for aborting current checkpoint in CheckpointBarrierUnaligner

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12460:
URL: https://github.com/apache/flink/pull/12460#issuecomment-638102290


   
   ## CI report:
   
   * 1d838c03d2b9f9744cae6fc03a919db72cc0efd9 UNKNOWN
   * b2c4d6aaf78612e7214cc49d22e2b90fe29b93b6 Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2686)
 
   * 31da2083df6221fd36c6ed5674927eed1cc68088 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2828)
 
   * 5729182fd15a074bebba7f878113781c2b119fd3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (FLINK-17765) Verbose client error messages

2020-06-05 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-17765:
-

Assignee: Till Rohrmann  (was: Chesnay Schepler)

> Verbose client error messages
> -
>
> Key: FLINK-17765
> URL: https://issues.apache.org/jira/browse/FLINK-17765
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission, Runtime / Coordination, Runtime 
> / REST
>Affects Versions: 1.10.1, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> Some client operations if they fail produce very verbose error messages which 
> are hard to decipher for the user. For example, if the job submission fails 
> because the savepoint path does not exist, then the user sees the following:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:689)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:227)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:906)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:982)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:982)
> Caused by: java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> at 
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:290)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1766)
> at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:104)
> at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:71)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1645)
> at 
> org.apache.flink.streaming.examples.statemachine.StateMachineExample.main(StateMachineExample.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 8 more
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1761)
> ... 17 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
> at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:366)
> at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:290)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at 
> java.util.concurrent.CompletableFuture.postComple

[GitHub] [flink] flinkbot commented on pull request #12505: [FLINK-17765] Simplify Flink stack traces

2020-06-05 Thread GitBox


flinkbot commented on pull request #12505:
URL: https://github.com/apache/flink/pull/12505#issuecomment-639582390


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 9e1468b9780de1ceecae32707a836f05561a14b1 (Fri Jun 05 
15:47:10 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

2020-06-05 Thread GitBox


wuchong commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436004322



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##
@@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier 
unresolvedIdentifier, Table
private Optional 
scanInternal(UnresolvedIdentifier identifier) {
ObjectIdentifier tableIdentifier = 
catalogManager.qualifyIdentifier(identifier);
 
-   return catalogManager.getTable(tableIdentifier)
-   .map(t -> new CatalogQueryOperation(tableIdentifier, 
t.getTable().getSchema()));
+   return catalogManager.getTable(tableIdentifier).map(t -> {
+   CatalogTableSchemaResolver resolver = new 
CatalogTableSchemaResolver(parser);

Review comment:
   Putting it in `CatalogManager#getTable` sounds good to me. We thought to 
have a single util to resolve the schema. But I think you have a better idea to 
resolve the schema before CatalogTable is exposed out of CatalogManager, this 
can avoid forgetting to resolve the shema. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

2020-06-05 Thread GitBox


wuchong commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436004322



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##
@@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier 
unresolvedIdentifier, Table
private Optional 
scanInternal(UnresolvedIdentifier identifier) {
ObjectIdentifier tableIdentifier = 
catalogManager.qualifyIdentifier(identifier);
 
-   return catalogManager.getTable(tableIdentifier)
-   .map(t -> new CatalogQueryOperation(tableIdentifier, 
t.getTable().getSchema()));
+   return catalogManager.getTable(tableIdentifier).map(t -> {
+   CatalogTableSchemaResolver resolver = new 
CatalogTableSchemaResolver(parser);

Review comment:
   Putting it in `CatalogManager#getTable` sounds good to me. We thought to 
have a single util to resolve the schema. But I think you have a better idea to 
resolve the schema before CatalogTable is exposed out of CatalogManager, this 
can avoid to forget resove the shema. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-17380) Kafka09ITCase.runAllDeletesTest: "Memory records is not writable"

2020-06-05 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-17380:
-
Summary: Kafka09ITCase.runAllDeletesTest: "Memory records is not writable"  
(was: runAllDeletesTest: "Memory records is not writable")

> Kafka09ITCase.runAllDeletesTest: "Memory records is not writable"
> -
>
> Key: FLINK-17380
> URL: https://issues.apache.org/jira/browse/FLINK-17380
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Tests
>Affects Versions: 1.10.0
>Reporter: Robert Metzger
>Priority: Major
>  Labels: test-stability
>
> CI Run: https://travis-ci.org/github/apache/flink/jobs/678707334
> {code}
> 23:27:15,112 [main] ERROR 
> org.apache.flink.streaming.connectors.kafka.Kafka09ITCase - 
> 
> Test 
> testAllDeletes(org.apache.flink.streaming.connectors.kafka.Kafka09ITCase) 
> failed with:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:648)
>   at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runAllDeletesTest(KafkaConsumerTestBase.java:1475)
>   at 
> org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testAllDeletes(Kafka09ITCase.java:117)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>   at akka.japi.pf.UnitCaseS

[GitHub] [flink] tillrohrmann commented on a change in pull request #12446: [FLINK-16225] Implement user class loading exception handler

2020-06-05 Thread GitBox


tillrohrmann commented on a change in pull request #12446:
URL: https://github.com/apache/flink/pull/12446#discussion_r435997739



##
File path: 
flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.function.Consumer;
+
+/**
+ * This class loader accepts a custom handler if an exception occurs in {@link 
#loadClass(String, boolean)}.
+ */
+public abstract class FlinkUserCodeClassLoader extends URLClassLoader {
+   public static final Consumer EMPTY_EXCEPTION_HANDLER = 
classLoadingException -> {};

Review comment:
   nit:
   ```suggestion
public static final Consumer NOOP_EXCEPTION_HANDLER = 
classLoadingException -> {};
   ```

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
##
@@ -79,7 +79,9 @@ public void testRecoveryRegisterAndDownload() throws 
Exception {
 
final BlobLibraryCacheManager.ClassLoaderFactory 
classLoaderFactory = BlobLibraryCacheManager.defaultClassLoaderFactory(

FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
-   new String[0]);
+   new String[0],
+   false,
+   exception -> {});

Review comment:
   Instead of passing a boolean and the fatal error handler to 
`defaultClassLoaderFactory()` one could also change it to pass in the 
`exception handler` instead. That way one would not pass in redundant 
information `false` and `exception -> {}` but it would suffice to simply pass 
`exception -> {}`.

##
File path: 
flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
##
@@ -110,6 +110,14 @@
" resolved through the parent ClassLoader first. A 
pattern is a simple prefix that is checked against" +
" the fully qualified class name. These patterns are 
appended to \"" + ALWAYS_PARENT_FIRST_LOADER_PATTERNS.key() + "\".");
 
+   @Documentation.Section(Documentation.Sections.EXPERT_CLASS_LOADING)
+   public static final ConfigOption 
FAIL_USER_CLASS_LOADING_METASPACE_OOM = ConfigOptions
+   .key("classloader.fail.on.metaspace.oom.error")
+   .booleanType()
+   .defaultValue(true)
+   .withDescription("Sets whether to fail and exit TaskManager JVM 
process if 'OutOfMemoryError: Metaspace' is " +

Review comment:
   ```suggestion
.withDescription("Fail Flink JVM processes if 
'OutOfMemoryError: Metaspace' is " +
   ```

##
File path: 
flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
##
@@ -110,6 +110,14 @@
" resolved through the parent ClassLoader first. A 
pattern is a simple prefix that is checked against" +
" the fully qualified class name. These patterns are 
appended to \"" + ALWAYS_PARENT_FIRST_LOADER_PATTERNS.key() + "\".");
 
+   @Documentation.Section(Documentation.Sections.EXPERT_CLASS_LOADING)
+   public static final ConfigOption 
FAIL_USER_CLASS_LOADING_METASPACE_OOM = ConfigOptions
+   .key("classloader.fail.on.metaspace.oom.error")

Review comment:
   ```suggestion
.key("classloader.fail-on-metaspace-oom-error")
   ```

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
##
@@ -151,12 +163,31 @@ public URLClassLoader createClassLoader(URL[] 
libraryURLs) {
classLoaderResolveOrder,
libraryURLs,

FlinkUserCodeClassLoaders.class.getClassLoader(),
-   alwaysParentFirstPatterns);
+   alwaysParentFirstPatterns,
+   classLoadingExceptionHandler);
}
}
 
-   public sta

[jira] [Updated] (FLINK-16768) HadoopS3RecoverableWriterITCase.testRecoverWithStateWithMultiPart hangs

2020-06-05 Thread Aljoscha Krettek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-16768:
-
Issue Type: Bug  (was: Task)

> HadoopS3RecoverableWriterITCase.testRecoverWithStateWithMultiPart hangs
> ---
>
> Key: FLINK-16768
> URL: https://issues.apache.org/jira/browse/FLINK-16768
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Tests
>Affects Versions: 1.10.0
>Reporter: Zhijiang
>Priority: Major
>  Labels: test-stability
> Fix For: 1.11.0
>
>
> Logs: 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6584&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=d26b3528-38b0-53d2-05f7-37557c2405e4]
> {code:java}
> 2020-03-24T15:52:18.9196862Z "main" #1 prio=5 os_prio=0 
> tid=0x7fd36c00b800 nid=0xc21 runnable [0x7fd3743ce000]
> 2020-03-24T15:52:18.9197235Zjava.lang.Thread.State: RUNNABLE
> 2020-03-24T15:52:18.9197536Z  at 
> java.net.SocketInputStream.socketRead0(Native Method)
> 2020-03-24T15:52:18.9197931Z  at 
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> 2020-03-24T15:52:18.9198340Z  at 
> java.net.SocketInputStream.read(SocketInputStream.java:171)
> 2020-03-24T15:52:18.9198749Z  at 
> java.net.SocketInputStream.read(SocketInputStream.java:141)
> 2020-03-24T15:52:18.9199171Z  at 
> sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
> 2020-03-24T15:52:18.9199840Z  at 
> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
> 2020-03-24T15:52:18.9200265Z  at 
> sun.security.ssl.InputRecord.read(InputRecord.java:532)
> 2020-03-24T15:52:18.9200663Z  at 
> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
> 2020-03-24T15:52:18.9201213Z  - locked <0x927583d8> (a 
> java.lang.Object)
> 2020-03-24T15:52:18.9201589Z  at 
> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
> 2020-03-24T15:52:18.9202026Z  at 
> sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
> 2020-03-24T15:52:18.9202583Z  - locked <0x92758c00> (a 
> sun.security.ssl.AppInputStream)
> 2020-03-24T15:52:18.9203029Z  at 
> org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
> 2020-03-24T15:52:18.9203558Z  at 
> org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:198)
> 2020-03-24T15:52:18.9204121Z  at 
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176)
> 2020-03-24T15:52:18.9204626Z  at 
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
> 2020-03-24T15:52:18.9205121Z  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
> 2020-03-24T15:52:18.9205679Z  at 
> com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
> 2020-03-24T15:52:18.9206164Z  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
> 2020-03-24T15:52:18.9206786Z  at 
> com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125)
> 2020-03-24T15:52:18.9207361Z  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
> 2020-03-24T15:52:18.9207839Z  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
> 2020-03-24T15:52:18.9208327Z  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
> 2020-03-24T15:52:18.9208809Z  at 
> com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
> 2020-03-24T15:52:18.9209273Z  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
> 2020-03-24T15:52:18.9210003Z  at 
> com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107)
> 2020-03-24T15:52:18.9210658Z  at 
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
> 2020-03-24T15:52:18.9211154Z  at 
> org.apache.hadoop.fs.s3a.S3AInputStream.lambda$read$3(S3AInputStream.java:445)
> 2020-03-24T15:52:18.9211631Z  at 
> org.apache.hadoop.fs.s3a.S3AInputStream$$Lambda$42/1936375962.execute(Unknown 
> Source)
> 2020-03-24T15:52:18.9212044Z  at 
> org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> 2020-03-24T15:52:18.9212553Z  at 
> org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
> 2020-03-24T15:52:18.9212972Z  at 
> org.apache.hadoop.fs.s3a.Invoker$$Lambda$23/1457226878.execute(Unknown Source)
> 2020-03-24T15:52:18.9213408Z  at 
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
> 2020-03-24T15:52:18.9213866Z  at 
> org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
> 2020-03-24T15:52:18.9214273Z  at 
> org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
> 2020-03-24T15:52:18.9214701Z  at 

[jira] [Commented] (FLINK-18161) Changing parallelism is not possible in sql-client.sh

2020-06-05 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126882#comment-17126882
 ] 

Jark Wu commented on FLINK-18161:
-

Could we just allow SQL CLI to set Flink configurations? This will also make it 
possible to set checkpointing/watermark in SQL CLI. 

> Changing parallelism is not possible in sql-client.sh
> -
>
> Key: FLINK-18161
> URL: https://issues.apache.org/jira/browse/FLINK-18161
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Priority: Critical
> Fix For: 1.11.0
>
>
> I tried using 
> {code}
> SET execution.parallelism=12
> {code}
> and changing the parallelism in the configuration file.
> My SQL queries were always running with p=1 for all operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18096) Generated avro formats should support user specified name and namespace

2020-06-05 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126877#comment-17126877
 ] 

Jark Wu commented on FLINK-18096:
-

This sounds like you will need an additional util to get avro Schema or schema 
string from TableSchema/Table. Like a reverse of FLINK-18158. 

> Generated avro formats should support user specified name and namespace
> ---
>
> Key: FLINK-18096
> URL: https://issues.apache.org/jira/browse/FLINK-18096
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Seth Wiesman
>Priority: Major
>
> When avro schema is auto derived it should still be possible to specify 
> namespace and name. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #12476: [FLINK-17902][python] Support the new interfaces about temporary functions in PyFlink

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12476:
URL: https://github.com/apache/flink/pull/12476#issuecomment-638622433


   
   ## CI report:
   
   * 049d7d45991db1164ff983ec3fd6c0b23ad6f07f Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2817)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #12504: [FLINK-16350] Support Zookeeper 3.5 in test_ha_per_job_cluster_datastream.sh

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #12504:
URL: https://github.com/apache/flink/pull/12504#issuecomment-639551716


   
   ## CI report:
   
   * 9e7ffdf88b132d86eba7dd516c9b16d1b705758a Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2842)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-18161) Changing parallelism is not possible in sql-client.sh

2020-06-05 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126875#comment-17126875
 ] 

Dawid Wysakowicz edited comment on FLINK-18161 at 6/5/20, 3:25 PM:
---

The simplest fix would be to overwrite the {{parallelism.default}} with the 
value of {{execution.parallelism}} (which is only sql-client specific).


was (Author: dawidwys):
The simplest fix would be to set the value of {{execution.parallelism}} to the 
{{parallelism.default}}.

> Changing parallelism is not possible in sql-client.sh
> -
>
> Key: FLINK-18161
> URL: https://issues.apache.org/jira/browse/FLINK-18161
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Priority: Critical
> Fix For: 1.11.0
>
>
> I tried using 
> {code}
> SET execution.parallelism=12
> {code}
> and changing the parallelism in the configuration file.
> My SQL queries were always running with p=1 for all operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18161) Changing parallelism is not possible in sql-client.sh

2020-06-05 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126875#comment-17126875
 ] 

Dawid Wysakowicz commented on FLINK-18161:
--

The simplest fix would be to set the value of {{execution.parallelism}} to the 
{{parallelism.default}}.

> Changing parallelism is not possible in sql-client.sh
> -
>
> Key: FLINK-18161
> URL: https://issues.apache.org/jira/browse/FLINK-18161
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Priority: Critical
> Fix For: 1.11.0
>
>
> I tried using 
> {code}
> SET execution.parallelism=12
> {code}
> and changing the parallelism in the configuration file.
> My SQL queries were always running with p=1 for all operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18161) Changing parallelism is not possible in sql-client.sh

2020-06-05 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126873#comment-17126873
 ] 

Dawid Wysakowicz commented on FLINK-18161:
--

{{parallelism.default}} works from {{flink-conf.yaml}} There is no way to set 
it from the {{sql-client}}
{{execution.parallelism}} works if we remove the {{parallelism.default}} from 
{{flink-conf.yaml}}

> Changing parallelism is not possible in sql-client.sh
> -
>
> Key: FLINK-18161
> URL: https://issues.apache.org/jira/browse/FLINK-18161
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Priority: Critical
> Fix For: 1.11.0
>
>
> I tried using 
> {code}
> SET execution.parallelism=12
> {code}
> and changing the parallelism in the configuration file.
> My SQL queries were always running with p=1 for all operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #11586: [FLINK-5552][runtime] make JMXServer static per JVM

2020-06-05 Thread GitBox


flinkbot edited a comment on pull request #11586:
URL: https://github.com/apache/flink/pull/11586#issuecomment-606958289


   
   ## CI report:
   
   * 613a5fd798911229926144246b94635cde46c6ed Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2309)
 
   * 0c0af972ac3f8477eba6c65cfe990a6af0105707 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2841)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-18096) Generated avro formats should support user specified name and namespace

2020-06-05 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126874#comment-17126874
 ] 

Jark Wu commented on FLINK-18096:
-

I see. Thanks for the explanation [~sjwiesman].

> Generated avro formats should support user specified name and namespace
> ---
>
> Key: FLINK-18096
> URL: https://issues.apache.org/jira/browse/FLINK-18096
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Seth Wiesman
>Priority: Major
>
> When avro schema is auto derived it should still be possible to specify 
> namespace and name. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18161) Changing parallelism is not possible in sql-client.sh

2020-06-05 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-18161:

Fix Version/s: 1.11.0

> Changing parallelism is not possible in sql-client.sh
> -
>
> Key: FLINK-18161
> URL: https://issues.apache.org/jira/browse/FLINK-18161
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Priority: Critical
> Fix For: 1.11.0
>
>
> I tried using 
> {code}
> SET execution.parallelism=12
> {code}
> and changing the parallelism in the configuration file.
> My SQL queries were always running with p=1 for all operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18161) Changing parallelism is not possible in sql-client.sh

2020-06-05 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126872#comment-17126872
 ] 

Jark Wu commented on FLINK-18161:
-

Does {{parallelism.default}} works? 

> Changing parallelism is not possible in sql-client.sh
> -
>
> Key: FLINK-18161
> URL: https://issues.apache.org/jira/browse/FLINK-18161
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Priority: Critical
> Fix For: 1.11.0
>
>
> I tried using 
> {code}
> SET execution.parallelism=12
> {code}
> and changing the parallelism in the configuration file.
> My SQL queries were always running with p=1 for all operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18096) Generated avro formats should support user specified name and namespace

2020-06-05 Thread Seth Wiesman (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126868#comment-17126868
 ] 

Seth Wiesman edited comment on FLINK-18096 at 6/5/20, 3:19 PM:
---

I certainly want to see registry support, and I believe Kurt has been 
discussing for 1.12 as well.

This came up for me when I was working with a user earlier this week and we 
were using 1.11 to write avro data to Kafka. We then had a DataStream 
application that was consuming from that topic and manually created an avro 
schema based on our DDL. Orignially we wanted to use a specific record but were 
forced to use generic record based on the above mentioned Java namespace issue. 


was (Author: sjwiesman):
I certainly want to see registry support in 1.12, and I believe Kurt has been 
discussing that as well. This came up for me when I was working with a user 
earlier this week and we were using 1.11 to write avro data to Kafka. We then 
had a DataStream application that was consuming from that topic and manually 
created an avro schema based on our DDL. Orignially we wanted to use a specific 
record but were forced to use generic record based on the above mentioned Java 
namespace issue. 

> Generated avro formats should support user specified name and namespace
> ---
>
> Key: FLINK-18096
> URL: https://issues.apache.org/jira/browse/FLINK-18096
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Seth Wiesman
>Priority: Major
>
> When avro schema is auto derived it should still be possible to specify 
> namespace and name. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18096) Generated avro formats should support user specified name and namespace

2020-06-05 Thread Seth Wiesman (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126868#comment-17126868
 ] 

Seth Wiesman commented on FLINK-18096:
--

I certainly want to see registry support in 1.12, and I believe Kurt has been 
discussing that as well. This came up for me when I was working with a user 
earlier this week and we were using 1.11 to write avro data to Kafka. We then 
had a DataStream application that was consuming from that topic and manually 
created an avro schema based on our DDL. Orignially we wanted to use a specific 
record but were forced to use generic record based on the above mentioned Java 
namespace issue. 

> Generated avro formats should support user specified name and namespace
> ---
>
> Key: FLINK-18096
> URL: https://issues.apache.org/jira/browse/FLINK-18096
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Seth Wiesman
>Priority: Major
>
> When avro schema is auto derived it should still be possible to specify 
> namespace and name. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   >