[flink] branch master updated (83c5e71 -> 5aba616)

2021-07-13 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 83c5e71  [FLINK-23150][table-planner] Remove the old code split 
implementation
 add 1251ab8  [FLINK-21088][runtime][checkpoint] Assign a finished snapshot 
to task if operators are all finished on restore
 add df99b49  [FLINK-21088][runtime][checkpoint] Pass the finished on 
restore status to TaskStateManager
 add 5aba616  [FLINK-21088][runtime][checkpoint] Pass the finish on restore 
status to operator chain

No new revisions were added by this update.

Summary of changes:
 .../api/runtime/SavepointTaskStateManager.java |  5 ++
 .../checkpoint/StateAssignmentOperation.java   | 56 ++
 .../runtime/checkpoint/TaskStateAssignment.java| 11 -
 .../runtime/checkpoint/TaskStateSnapshot.java  | 17 ++-
 .../flink/runtime/state/TaskStateManager.java  |  3 ++
 .../flink/runtime/state/TaskStateManagerImpl.java  |  8 
 .../checkpoint/StateAssignmentOperationTest.java   | 31 
 .../runtime/state/TaskStateManagerImplTest.java| 17 +++
 .../flink/runtime/state/TestTaskStateManager.java  | 10 
 .../streaming/runtime/tasks/OperatorChain.java | 11 -
 .../flink/streaming/runtime/tasks/StreamTask.java  |  6 ++-
 .../operators/StreamOperatorChainingTest.java  |  2 +-
 .../StreamSourceOperatorLatencyMetricsTest.java|  4 +-
 .../tasks/StreamTaskExecutionDecorationTest.java   |  2 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java| 15 --
 15 files changed, 165 insertions(+), 33 deletions(-)


[flink] branch master updated: [FLINK-23356][hbase] Do not use delegation token in case of keytab

2021-07-13 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 657df14  [FLINK-23356][hbase] Do not use delegation token in case of 
keytab
657df14 is described below

commit 657df14677a8f4e500efdc79627f095e32e8c4b4
Author: Gabor Somogyi 
AuthorDate: Mon Jul 12 10:40:54 2021 +0200

[FLINK-23356][hbase] Do not use delegation token in case of keytab

Closes #16466
---
 .../org/apache/flink/runtime/security/modules/HadoopModule.java | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index 9d23992..78f7ff3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -92,11 +92,13 @@ public class HadoopModule implements SecurityModule {
 // and does not fallback to using Kerberos tickets
 Credentials credentialsToBeAdded = new Credentials();
 final Text hdfsDelegationTokenKind = new 
Text("HDFS_DELEGATION_TOKEN");
+final Text hbaseDelegationTokenKind = new 
Text("HBASE_AUTH_TOKEN");
 Collection> usrTok =
 credentialsFromTokenStorageFile.getAllTokens();
-// If UGI use keytab for login, do not load HDFS 
delegation token.
+// If UGI use keytab for login, do not load HDFS/HBase 
delegation token.
 for (Token token : usrTok) {
-if (!token.getKind().equals(hdfsDelegationTokenKind)) {
+if (!token.getKind().equals(hdfsDelegationTokenKind)
+&& 
!token.getKind().equals(hbaseDelegationTokenKind)) {
 credentialsToBeAdded.addToken(token.getService(), 
token);
 }
 }


[flink] branch master updated: [FLINK-23368][python] Fix the wrong mapping of state cache in PyFlink

2021-07-13 Thread hxb
This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 08e9343  [FLINK-23368][python] Fix the wrong mapping of state cache in 
PyFlink
08e9343 is described below

commit 08e9343b0be8e1d438fe9883f5cc407c5ae9e88e
Author: huangxingbo 
AuthorDate: Tue Jul 13 16:03:16 2021 +0800

[FLINK-23368][python] Fix the wrong mapping of state cache in PyFlink

This closes #16476.
---
 .../pyflink/datastream/tests/test_data_stream.py   | 44 ++
 flink-python/pyflink/fn_execution/state_impl.py|  4 +-
 2 files changed, 31 insertions(+), 17 deletions(-)

diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py 
b/flink-python/pyflink/datastream/tests/test_data_stream.py
index 3d85c4a..027cea3 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -395,7 +395,13 @@ class DataStreamTests(object):
 self.assertEqual(expected, actual)
 
 def test_key_by_map(self):
-ds = self.env.from_collection([('a', 0), ('b', 0), ('c', 1), ('d', 1), 
('e', 2)],
+from pyflink.util.java_utils import get_j_env_configuration
+from pyflink.common import Configuration
+self.env.set_parallelism(1)
+config = Configuration(
+
j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment))
+config.set_integer("python.fn-execution.bundle.size", 1)
+ds = self.env.from_collection([('a', 0), ('b', 1), ('c', 0), ('d', 1), 
('e', 2)],
   type_info=Types.ROW([Types.STRING(), 
Types.INT()]))
 keyed_stream = ds.key_by(MyKeySelector(), key_type=Types.INT())
 
@@ -404,7 +410,6 @@ class DataStreamTests(object):
 
 class AssertKeyMapFunction(MapFunction):
 def __init__(self):
-self.pre = None
 self.state = None
 
 def open(self, runtime_context: RuntimeContext):
@@ -412,28 +417,37 @@ class DataStreamTests(object):
 ValueStateDescriptor("test_state", 
Types.PICKLED_BYTE_ARRAY()))
 
 def map(self, value):
+if value[0] == 'a':
+pass
+elif value[0] == 'b':
+state_value = self._get_state_value()
+assert state_value == 1
+self.state.update(state_value)
+elif value[0] == 'c':
+state_value = self._get_state_value()
+assert state_value == 1
+self.state.update(state_value)
+elif value[0] == 'd':
+state_value = self._get_state_value()
+assert state_value == 2
+self.state.update(state_value)
+else:
+pass
+return value
+
+def _get_state_value(self):
 state_value = self.state.value()
 if state_value is None:
 state_value = 1
 else:
 state_value += 1
-if value[0] == 'b':
-assert self.pre == 'a'
-assert state_value == 2
-if value[0] == 'd':
-assert self.pre == 'c'
-assert state_value == 2
-if value[0] == 'e':
-assert state_value == 1
-self.pre = value[0]
-self.state.update(state_value)
-return value
+return state_value
 
 keyed_stream.map(AssertKeyMapFunction()).add_sink(self.test_sink)
 self.env.execute('key_by_test')
 results = self.test_sink.get_results(True)
-expected = ["Row(f0='e', f1=2)", "Row(f0='a', f1=0)", "Row(f0='b', 
f1=0)",
-"Row(f0='c', f1=1)", "Row(f0='d', f1=1)"]
+expected = ["Row(f0='e', f1=2)", "Row(f0='a', f1=0)", "Row(f0='b', 
f1=1)",
+"Row(f0='c', f1=0)", "Row(f0='d', f1=1)"]
 results.sort()
 expected.sort()
 self.assertEqual(expected, results)
diff --git a/flink-python/pyflink/fn_execution/state_impl.py 
b/flink-python/pyflink/fn_execution/state_impl.py
index 590c1d7..e221e70 100644
--- a/flink-python/pyflink/fn_execution/state_impl.py
+++ b/flink-python/pyflink/fn_execution/state_impl.py
@@ -1060,14 +1060,14 @@ class RemoteKeyedStateBackend(object):
 if key == self._current_key:
 return
 encoded_old_key = self._encoded_current_key
-self._current_key = key
-self._encoded_current_key = 
self._key_coder_impl.encode(self._current_key)
 for state_name, state_obj in self._all_states.items():
 if self._state_cache_size > 0:
  

[flink] branch release-1.13 updated (2c455f3 -> c1f30ee)

2021-07-13 Thread hxb
This is an automated email from the ASF dual-hosted git repository.

hxb pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2c455f3  [FLINK-22766][connector/kafka] Report offsets and Kafka 
consumer metrics in Flink metric group
 add c1f30ee  [FLINK-23368][python] Fix the wrong mapping of state cache in 
PyFlink

No new revisions were added by this update.

Summary of changes:
 .../pyflink/datastream/tests/test_data_stream.py   | 44 ++
 flink-python/pyflink/fn_execution/state_impl.py|  4 +-
 2 files changed, 31 insertions(+), 17 deletions(-)


[flink] branch master updated (08e9343 -> c874338)

2021-07-13 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 08e9343  [FLINK-23368][python] Fix the wrong mapping of state cache in 
PyFlink
 add c874338  [FLINK-23233][runtime] Ensure checkpoints confirmed after all 
the failed events processed for OepratorCoordinator

No new revisions were added by this update.

Summary of changes:
 .../coordination/OperatorCoordinatorHolder.java| 27 +--
 .../operators/coordination/SubtaskGatewayImpl.java | 31 ---
 .../org/apache/flink/runtime/util/Runnables.java   | 12 +++
 .../coordination/EventReceivingTasks.java  |  7 +-
 .../OperatorCoordinatorHolderTest.java | 94 ++
 5 files changed, 135 insertions(+), 36 deletions(-)


[flink] branch release-1.13 updated (c1f30ee -> 1fa52e1)

2021-07-13 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c1f30ee  [FLINK-23368][python] Fix the wrong mapping of state cache in 
PyFlink
 add 1fa52e1  [FLINK-23233][runtime] Ensure checkpoints confirmed after all 
the failed events processed for OepratorCoordinator

No new revisions were added by this update.

Summary of changes:
 .../coordination/OperatorCoordinatorHolder.java| 27 +--
 .../operators/coordination/SubtaskGatewayImpl.java | 31 ---
 .../org/apache/flink/runtime/util/Runnables.java   | 10 +++
 .../coordination/EventReceivingTasks.java  |  7 +-
 .../OperatorCoordinatorHolderTest.java | 94 ++
 5 files changed, 133 insertions(+), 36 deletions(-)


[flink] branch release-1.12 updated (6292777 -> 2c3f8f6)

2021-07-13 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 6292777  [FLINK-23223] Notifies if there are available data on 
resumption for pipelined subpartition
 add 2c3f8f6  [FLINK-23233][runtime] Ensure checkpoints confirmed after all 
the failed events processed for OepratorCoordinator

No new revisions were added by this update.

Summary of changes:
 .../coordination/OperatorCoordinatorHolder.java| 27 +--
 .../operators/coordination/SubtaskGatewayImpl.java | 31 ---
 .../org/apache/flink/runtime/util/Runnables.java   | 10 +++
 .../coordination/EventReceivingTasks.java  |  7 +-
 .../OperatorCoordinatorHolderTest.java | 94 ++
 5 files changed, 133 insertions(+), 36 deletions(-)


[flink] 01/09: [FLINK-21368] Remove RpcService#getExecutor

2021-07-13 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5d3ca598ab37d3dfec5b01e19080874e541212af
Author: Chesnay Schepler 
AuthorDate: Thu Jul 8 10:51:54 2021 +0200

[FLINK-21368] Remove RpcService#getExecutor
---
 .../apache/flink/runtime/rpc/akka/AkkaRpcService.java| 16 
 .../apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java  |  2 +-
 .../java/org/apache/flink/runtime/rpc/RpcService.java| 16 +---
 .../flink/runtime/registration/RetryingRegistration.java |  8 
 .../runtime/taskexecutor/DefaultJobLeaderService.java|  2 +-
 .../apache/flink/runtime/taskexecutor/TaskExecutor.java  |  6 +++---
 .../registration/RegisteredRpcConnectionTest.java| 10 +-
 .../runtime/registration/RetryingRegistrationTest.java   |  8 +---
 .../apache/flink/runtime/rpc/FencedRpcEndpointTest.java  |  2 +-
 .../taskexecutor/TaskSubmissionTestEnvironment.java  |  4 +++-
 .../OperatorEventSendingCheckpointITCase.java|  6 --
 11 files changed, 28 insertions(+), 52 deletions(-)

diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index bba81a7..4c1a788 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -40,6 +40,7 @@ import org.apache.flink.util.TimeUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.apache.flink.util.function.FunctionUtils;
 
 import akka.actor.AbstractActor;
 import akka.actor.ActorRef;
@@ -48,7 +49,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.DeadLetter;
 import akka.actor.Props;
-import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +69,6 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
@@ -77,7 +76,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import scala.Option;
-import scala.concurrent.Future;
 import scala.reflect.ClassTag$;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -468,11 +466,6 @@ public class AkkaRpcService implements RpcService {
 }
 
 @Override
-public Executor getExecutor() {
-return actorSystem.dispatcher();
-}
-
-@Override
 public ScheduledExecutor getScheduledExecutor() {
 return internalScheduledExecutor;
 }
@@ -488,14 +481,13 @@ public class AkkaRpcService implements RpcService {
 
 @Override
 public void execute(Runnable runnable) {
-actorSystem.dispatcher().execute(runnable);
+getScheduledExecutor().execute(runnable);
 }
 
 @Override
 public  CompletableFuture execute(Callable callable) {
-Future scalaFuture = Futures.future(callable, 
actorSystem.dispatcher());
-
-return AkkaFutureUtils.toJava(scalaFuture);
+return CompletableFuture.supplyAsync(
+FunctionUtils.uncheckedSupplier(callable::call), 
getScheduledExecutor());
 }
 
 // 
---
diff --git 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 01437e4..b522d44 100644
--- 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -192,7 +192,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
 assertFalse(terminationFuture.isDone());
 
-CompletableFuture.runAsync(rpcEndpoint::closeAsync, 
akkaRpcService.getExecutor());
+CompletableFuture.runAsync(rpcEndpoint::closeAsync, 
akkaRpcService.getScheduledExecutor());
 
 // wait until the rpc endpoint has terminated
 terminationFuture.get();
diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index eb36deb..d8c2d66 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ 

[flink] 03/09: [hotfix] Remove akka dependency in MetricUtilsTest

2021-07-13 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae85de7fa7f06b3d3751405d1fe0b57b448ca7f4
Author: Chesnay Schepler 
AuthorDate: Thu Jul 1 10:15:54 2021 +0200

[hotfix] Remove akka dependency in MetricUtilsTest
---
 .../apache/flink/runtime/metrics/util/MetricUtilsTest.java   | 12 +---
 1 file changed, 1 insertion(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
index 8f387e8..e3cc553 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcSystem;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServicesBuilder;
 import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
@@ -40,7 +39,6 @@ import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 
-import akka.actor.ActorSystem;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -54,7 +52,6 @@ import java.util.List;
 import static 
org.apache.flink.runtime.metrics.util.MetricUtils.METRIC_GROUP_FLINK;
 import static 
org.apache.flink.runtime.metrics.util.MetricUtils.METRIC_GROUP_MANAGED_MEMORY;
 import static 
org.apache.flink.runtime.metrics.util.MetricUtils.METRIC_GROUP_MEMORY;
-import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
@@ -83,17 +80,10 @@ public class MetricUtilsTest extends TestLogger {
 final RpcService rpcService =
 MetricUtils.startRemoteMetricsRpcService(
 configuration, "localhost", RpcSystem.load());
-assertThat(rpcService, instanceOf(AkkaRpcService.class));
-
-final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
 
 try {
 final int threadPriority =
-actorSystem
-.settings()
-.config()
-
.getInt("akka.actor.default-dispatcher.thread-priority");
-
+rpcService.execute(() -> 
Thread.currentThread().getPriority()).get();
 assertThat(threadPriority, is(expectedThreadPriority));
 } finally {
 rpcService.stopService().get();


[flink] 05/09: [hotfix] Extract logging parent-first patterns into constant

2021-07-13 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9cf4c48945f10db074854828d92ca70280ef23ab
Author: Chesnay Schepler 
AuthorDate: Mon Jul 5 10:56:34 2021 +0200

[hotfix] Extract logging parent-first patterns into constant
---
 docs/layouts/shortcodes/generated/core_configuration.html|  2 +-
 .../shortcodes/generated/expert_class_loading_section.html   |  2 +-
 .../java/org/apache/flink/configuration/CoreOptions.java | 12 +---
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index bf00ecd..43ec684 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -30,7 +30,7 @@ This check should only be disabled if such a leak prevents 
further jobs from run
 
 
 classloader.parent-first-patterns.default
-"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback;org.xml;javax.xml;org.apache.xerces;org.w3c"
+"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.xml;javax.xml;org.apache.xerces;org.w3c;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback"
 String
 A (semicolon-separated) list of patterns that specifies which 
classes should always be resolved through the parent ClassLoader first. A 
pattern is a simple prefix that is checked against the fully qualified class 
name. This setting should generally not be modified. To add another pattern we 
recommend to use "classloader.parent-first-patterns.additional" instead.
 
diff --git 
a/docs/layouts/shortcodes/generated/expert_class_loading_section.html 
b/docs/layouts/shortcodes/generated/expert_class_loading_section.html
index a4236f1..c88ac73 100644
--- a/docs/layouts/shortcodes/generated/expert_class_loading_section.html
+++ b/docs/layouts/shortcodes/generated/expert_class_loading_section.html
@@ -30,7 +30,7 @@ This check should only be disabled if such a leak prevents 
further jobs from run
 
 
 classloader.parent-first-patterns.default
-"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback;org.xml;javax.xml;org.apache.xerces;org.w3c"
+"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.xml;javax.xml;org.apache.xerces;org.w3c;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback"
 String
 A (semicolon-separated) list of patterns that specifies which 
classes should always be resolved through the parent ClassLoader first. A 
pattern is a simple prefix that is checked against the fully qualified class 
name. This setting should generally not be modified. To add another pattern we 
recommend to use "classloader.parent-first-patterns.additional" instead.
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 7fde49a..db21d21 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.configuration;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.ConfigGroup;
 import org.apache.flink.annotation.docs.ConfigGroups;
@@ -34,6 +35,10 @@ import static 
org.apache.flink.configuration.ConfigOptions.key;
 @ConfigGroups(groups = {@ConfigGroup(name = "Environment", keyPrefix = "env")})
 public class CoreOptions {
 
+@Internal
+public static final String PARENT_FIRST_LOGGING_PATTERNS =
+
"org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback";
+
 // 
 //  Classloading Parameters
 // 
@@ -92,7 +97,8 @@ public class CoreOptions {
 public static final ConfigOption 
ALWAYS_PARENT_FIRST_LOADER_PATTERNS =
 ConfigOptions.key("classloader.parent-first-patterns.default")
 .defaultValue(
-
"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apach

[flink] branch master updated (c874338 -> e341e47)

2021-07-13 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c874338  [FLINK-23233][runtime] Ensure checkpoints confirmed after all 
the failed events processed for OepratorCoordinator
 new 5d3ca59  [FLINK-21368] Remove RpcService#getExecutor
 new e2d36ba  [hotfix] Replace AskTimeException usages in tests
 new ae85de7  [hotfix] Remove akka dependency in MetricUtilsTest
 new b8ee568  [hotfix] Allow @Internal annotation on fields
 new 9cf4c48  [hotfix] Extract logging parent-first patterns into constant
 new 548d70a  [FLINK-18783] RpcSystem extends AutoCloseable
 new ad2f2bc  [FLINK-18783] RpcSystem#load accepts Configuration
 new 1aa7d16  [FLINK-18783] Add ComponentClassLoader
 new e341e47  [FLINK-18783] Load Akka with separate classloader

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../resource-providers/standalone/kubernetes.md|   4 +-
 .../resource-providers/standalone/kubernetes.md|   4 +-
 .../shortcodes/generated/core_configuration.html   |   2 +-
 .../generated/expert_class_loading_section.html|   2 +-
 .../java/org/apache/flink/annotation/Internal.java |   2 +-
 flink-clients/pom.xml  |   6 +-
 flink-connectors/flink-connector-base/pom.xml  |   2 +-
 flink-connectors/flink-connector-cassandra/pom.xml |   2 +-
 .../flink-connector-elasticsearch-base/pom.xml |   2 +-
 .../flink-connector-elasticsearch5/pom.xml |   2 +-
 .../flink-connector-elasticsearch6/pom.xml |   2 +-
 .../flink-connector-elasticsearch7/pom.xml |   2 +-
 flink-connectors/flink-connector-files/pom.xml |   2 +-
 .../flink-connector-gcp-pubsub/pom.xml |   2 +-
 flink-connectors/flink-connector-hbase-1.4/pom.xml |   2 +-
 flink-connectors/flink-connector-hive/pom.xml  |   2 +-
 flink-connectors/flink-connector-jdbc/pom.xml  |   2 +-
 flink-connectors/flink-connector-kafka/pom.xml |   6 +-
 flink-connectors/flink-connector-kinesis/pom.xml   |   4 +-
 flink-connectors/flink-connector-rabbitmq/pom.xml  |   4 +-
 flink-container/pom.xml|   2 +-
 .../apache/flink/configuration/CoreOptions.java|  15 +-
 .../core/classloading/ComponentClassLoader.java| 266 ++
 .../core/classloading/SubmoduleClassLoader.java|  45 +++
 .../org/apache/flink/core/plugin/PluginLoader.java |  96 +-
 .../apache/flink/util/concurrent/FutureUtils.java  |  26 +-
 .../classloading/ComponentClassLoaderTest.java | 268 +++
 flink-dist/pom.xml |  17 +-
 flink-dist/src/main/assemblies/opt.xml |   4 +-
 flink-dist/src/main/assemblies/plugins.xml |   6 +-
 .../src/main/flink-bin/conf/log4j-cli.properties   |   2 +-
 .../main/flink-bin/conf/log4j-console.properties   |   2 +-
 .../main/flink-bin/conf/log4j-session.properties   |   2 +-
 .../src/main/flink-bin/conf/log4j.properties   |   2 +-
 .../src/main/flink-bin/conf/logback-console.xml|   2 +-
 flink-dist/src/main/flink-bin/conf/logback.xml |   2 +-
 flink-dist/src/main/resources/META-INF/NOTICE  |  12 -
 flink-docs/pom.xml |   8 +-
 .../pom.xml|   2 +-
 .../flink-end-to-end-tests-common/pom.xml  |   2 +-
 .../flink-metrics-availability-test/pom.xml|   2 +-
 .../flink-metrics-reporter-prometheus-test/pom.xml |   2 +-
 flink-end-to-end-tests/test-scripts/common.sh  |   4 +-
 flink-examples/flink-examples-streaming/pom.xml|   2 +-
 flink-examples/flink-examples-table/pom.xml|   2 +-
 flink-formats/flink-avro/pom.xml   |   2 +-
 flink-formats/flink-compress/pom.xml   |   2 +-
 flink-formats/flink-csv/pom.xml|   2 +-
 flink-formats/flink-hadoop-bulk/pom.xml|   2 +-
 flink-formats/flink-json/pom.xml   |   2 +-
 flink-formats/flink-orc/pom.xml|   2 +-
 flink-formats/flink-parquet/pom.xml|   2 +-
 flink-formats/flink-sequence-file/pom.xml  |   2 +-
 flink-fs-tests/pom.xml |   4 +-
 flink-kubernetes/pom.xml   |   6 +-
 flink-libraries/flink-cep/pom.xml  |   4 +-
 flink-libraries/flink-gelly/pom.xml|   2 +-
 flink-libraries/flink-state-processing-api/pom.xml |   4 +-
 flink-metrics/flink-metrics-dropwizard/pom.xml |   4 +-
 flink-metrics/flink-metrics-influxdb/pom.xml   |   6 +-
 flink-metrics/flink-metrics-jmx/pom.xml|   8 +-
 flink-metrics/flink-metrics-prometheus/pom.xml |   6 +-
 flink-metrics/flink-metrics-

[flink] 06/09: [FLINK-18783] RpcSystem extends AutoCloseable

2021-07-13 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 548d70aa50a43b9a8c7c901bdefbb75b85959904
Author: Chesnay Schepler 
AuthorDate: Mon Jul 12 09:44:05 2021 +0200

[FLINK-18783] RpcSystem extends AutoCloseable
---
 .../src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java |  6 +-
 .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java|  9 +++--
 .../org/apache/flink/runtime/minicluster/MiniCluster.java | 11 ++-
 .../apache/flink/runtime/taskexecutor/TaskManagerRunner.java  |  9 +++--
 4 files changed, 29 insertions(+), 6 deletions(-)

diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
index 3bc19bd..adc31c6 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
@@ -27,7 +27,7 @@ import java.util.ServiceLoader;
  * This interface serves as a factory interface for RPC services, with some 
additional utilities
  * that are reliant on implementation details of the RPC service.
  */
-public interface RpcSystem extends RpcSystemUtils {
+public interface RpcSystem extends RpcSystemUtils, AutoCloseable {
 
 /**
  * Returns a builder for an {@link RpcService} that is only reachable from 
the local machine.
@@ -51,6 +51,10 @@ public interface RpcSystem extends RpcSystemUtils {
 @Nullable String externalAddress,
 String externalPortRange);
 
+/** Hook to cleanup resources, like common thread pools or classloaders. */
+@Override
+default void close() {}
+
 /** Builder for {@link RpcService}. */
 interface RpcServiceBuilder {
 RpcServiceBuilder withComponentName(String name);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index c0d3063..8b24dfa 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -150,6 +150,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
 private ExecutionGraphInfoStore executionGraphInfoStore;
 
 private final Thread shutDownHook;
+private RpcSystem rpcSystem;
 
 protected ClusterEntrypoint(Configuration configuration) {
 this.configuration = generateClusterConfiguration(configuration);
@@ -293,7 +294,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
 LOG.info("Initializing cluster services.");
 
 synchronized (lock) {
-final RpcSystem rpcSystem = RpcSystem.load();
+rpcSystem = RpcSystem.load();
 
 commonRpcService =
 RpcUtils.createRemoteRpcService(
@@ -499,8 +500,12 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
 FutureUtils.composeAfterwards(
 shutDownApplicationFuture, () -> 
stopClusterServices(cleanupHaData));
 
+final CompletableFuture rpcSystemClassLoaderCloseFuture =
+FutureUtils.runAfterwards(serviceShutdownFuture, 
rpcSystem::close);
+
 final CompletableFuture cleanupDirectoriesFuture =
-FutureUtils.runAfterwards(serviceShutdownFuture, 
this::cleanupDirectories);
+FutureUtils.runAfterwards(
+rpcSystemClassLoaderCloseFuture, 
this::cleanupDirectories);
 
 cleanupDirectoriesFuture.whenComplete(
 (Void ignored2, Throwable serviceThrowable) -> {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3f884c8..c9f24d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -197,6 +197,9 @@ public class MiniCluster implements AutoCloseableAsync {
 /** Flag marking the mini cluster as started/running. */
 private volatile boolean running;
 
+@GuardedBy("lock")
+private RpcSystem rpcSystem;
+
 // 
 
 /**
@@ -273,7 +276,7 @@ public class MiniCluster implements AutoCloseableAsync {
 try {
 initializeIOFormatClasses(configuration);
 
-final RpcSystem rpcSystem = RpcSystem.load();
+rpcSystem = RpcSystem.load();
 
 

[flink] 04/09: [hotfix] Allow @Internal annotation on fields

2021-07-13 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8ee5687bcf218d7f36bce082b06344eb83d9a01
Author: Chesnay Schepler 
AuthorDate: Mon Jul 5 10:53:41 2021 +0200

[hotfix] Allow @Internal annotation on fields
---
 .../src/main/java/org/apache/flink/annotation/Internal.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java 
b/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java
index 8c2baf9..6531c84 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java
@@ -29,6 +29,6 @@ import java.lang.annotation.Target;
  * Developer APIs are stable but internal to Flink and might change across 
releases.
  */
 @Documented
-@Target({ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR})
+@Target({ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR, 
ElementType.FIELD})
 @Public
 public @interface Internal {}


[flink] 02/09: [hotfix] Replace AskTimeException usages in tests

2021-07-13 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e2d36ba8f0e8b852f01ba6cef8924a6213a82f29
Author: Chesnay Schepler 
AuthorDate: Thu Jul 1 10:16:09 2021 +0200

[hotfix] Replace AskTimeException usages in tests
---
 .../resourcemanager/slotmanager/DeclarativeSlotManagerTest.java | 3 +--
 .../resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java| 6 +++---
 2 files changed, 4 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index d325bbf..5b826db 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -56,7 +56,6 @@ import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
 
-import akka.pattern.AskTimeoutException;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -602,7 +601,7 @@ public class DeclarativeSlotManagerTest extends TestLogger {
 final BlockingQueue>> 
responseQueue =
 new ArrayBlockingQueue<>(2);
 responseQueue.add(
-() -> FutureUtils.completedExceptionally(new 
AskTimeoutException("timeout")));
+() -> FutureUtils.completedExceptionally(new 
TimeoutException("timeout")));
 responseQueue.add(
 () -> {
 secondSlotRequestFuture.complete(null);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java
index 62ecfe3..d00bbd2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java
@@ -36,11 +36,11 @@ import org.apache.flink.runtime.testutils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import akka.pattern.AskTimeoutException;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
@@ -126,7 +126,7 @@ public class DefaultSlotStatusSyncerTest extends TestLogger 
{
 .setRequestSlotFunction(
 ignored ->
 FutureUtils.completedExceptionally(
-new 
AskTimeoutException("timeout")))
+new 
TimeoutException("timeout")))
 .createTestingTaskExecutorGateway();
 final TaskExecutorConnection taskExecutorConnection =
 new TaskExecutorConnection(ResourceID.generate(), 
taskExecutorGateway);
@@ -151,7 +151,7 @@ public class DefaultSlotStatusSyncerTest extends TestLogger 
{
 try {
 allocatedFuture.get();
 } catch (Exception e) {
-assertThat(e.getCause(), instanceOf(AskTimeoutException.class));
+assertThat(e.getCause(), instanceOf(TimeoutException.class));
 }
 assertThat(resourceTracker.getAcquiredResources(jobId), is(empty()));
 assertThat(


[flink] 07/09: [FLINK-18783] RpcSystem#load accepts Configuration

2021-07-13 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ad2f2bccf5f933a85cae9417e1fe0559075ece84
Author: Chesnay Schepler 
AuthorDate: Thu Jul 1 13:59:01 2021 +0200

[FLINK-18783] RpcSystem#load accepts Configuration
---
 .../src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java  | 10 ++
 .../org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java |  2 +-
 .../java/org/apache/flink/runtime/minicluster/MiniCluster.java |  2 +-
 .../apache/flink/runtime/taskexecutor/TaskManagerRunner.java   |  2 +-
 4 files changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
index adc31c6..5934c60 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
@@ -78,6 +78,16 @@ public interface RpcSystem extends RpcSystemUtils, 
AutoCloseable {
  * @return loaded RpcSystem
  */
 static RpcSystem load() {
+return load(new Configuration());
+}
+
+/**
+ * Loads the RpcSystem.
+ *
+ * @param config Flink configuration
+ * @return loaded RpcSystem
+ */
+static RpcSystem load(Configuration config) {
 final ClassLoader classLoader = RpcSystem.class.getClassLoader();
 return ServiceLoader.load(RpcSystem.class, 
classLoader).iterator().next();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 8b24dfa..07b0a71 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -294,7 +294,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
 LOG.info("Initializing cluster services.");
 
 synchronized (lock) {
-rpcSystem = RpcSystem.load();
+rpcSystem = RpcSystem.load(configuration);
 
 commonRpcService =
 RpcUtils.createRemoteRpcService(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index c9f24d8..d083783 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -276,7 +276,7 @@ public class MiniCluster implements AutoCloseableAsync {
 try {
 initializeIOFormatClasses(configuration);
 
-rpcSystem = RpcSystem.load();
+rpcSystem = RpcSystem.load(configuration);
 
 LOG.info("Starting Metrics Registry");
 metricRegistry =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index aaf7aa1..48eec9f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -139,7 +139,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
 throws Exception {
 this.configuration = checkNotNull(configuration);
 
-rpcSystem = RpcSystem.load();
+rpcSystem = RpcSystem.load(configuration);
 
 timeout = 
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
 


[flink] 08/09: [FLINK-18783] Add ComponentClassLoader

2021-07-13 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1aa7d16c1188dd70dd7cbf1a7e4acf0e0db8cea1
Author: Chesnay Schepler 
AuthorDate: Tue Jul 6 16:59:30 2021 +0200

[FLINK-18783] Add ComponentClassLoader

The new ComponentClassLoader is intended as a CL that can eventually 
support all of our class-loading needs, including plugins, rpc systems and 
user-code.
---
 .../core/classloading/ComponentClassLoader.java| 266 
 .../org/apache/flink/core/plugin/PluginLoader.java |  96 +---
 .../classloading/ComponentClassLoaderTest.java | 268 +
 3 files changed, 537 insertions(+), 93 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java
 
b/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java
new file mode 100644
index 000..39636a6
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.classloading;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.Iterator;
+
+/**
+ * A {@link URLClassLoader} that restricts which classes can be loaded to 
those contained within the
+ * given classpath, except classes from a given set of packages that are 
either loaded owner or
+ * component-first.
+ *
+ * Depiction of the class loader hierarchy:
+ *
+ * 
+ *   Owner Bootstrap
+ *   ^ ^
+ *   |-|
+ *|
+ *Component
+ * 
+ *
+ * For loading classes/resources, class loaders are accessed in one of the 
following orders:
+ *
+ * 
+ *   component-only: component -> bootstrap; default.
+ *   component-first: component -> bootstrap -> owner; opt-in.
+ *   owner-first: owner -> component -> bootstrap; opt-in.
+ * 
+ */
+public class ComponentClassLoader extends URLClassLoader {
+private static final ClassLoader PLATFORM_OR_BOOTSTRAP_LOADER;
+
+private final ClassLoader ownerClassLoader;
+
+private final String[] ownerFirstPackages;
+private final String[] componentFirstPackages;
+private final String[] ownerFirstResourcePrefixes;
+private final String[] componentFirstResourcePrefixes;
+
+public ComponentClassLoader(
+URL[] classpath,
+ClassLoader ownerClassLoader,
+String[] ownerFirstPackages,
+String[] componentFirstPackages) {
+super(classpath, PLATFORM_OR_BOOTSTRAP_LOADER);
+this.ownerClassLoader = ownerClassLoader;
+
+this.ownerFirstPackages = ownerFirstPackages;
+this.componentFirstPackages = componentFirstPackages;
+
+ownerFirstResourcePrefixes = 
convertPackagePrefixesToPathPrefixes(ownerFirstPackages);
+componentFirstResourcePrefixes =
+convertPackagePrefixesToPathPrefixes(componentFirstPackages);
+}
+
+// 
--
+// Class loading
+// 
--
+
+@Override
+protected Class loadClass(final String name, final boolean resolve)
+throws ClassNotFoundException {
+synchronized (getClassLoadingLock(name)) {
+final Class loadedClass = findLoadedClass(name);
+if (loadedClass != null) {
+return resolveIfNeeded(resolve, loadedClass);
+}
+
+if (isComponentFirstClass(name)) {
+return loadClassFromComponentFirst(name, resolve);
+}
+if (isOwnerFirstClass(name)) {
+return loadClassFromOwnerFirst(name, resolve);
+}
+
+

[flink-training] branch master updated (dd7be31 -> 5c8386d)

2021-07-13 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git.


from dd7be31  [FLINK-22868] Update to use Flink 1.13.1
 new 82620e0  [FLINK-23332][gradle] update Gradle version
 new 1504a64  [hotfix][gradle] update build-scan plugin version
 new 5c8386d  [hotfix][gradle] set project properties in a central place

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build.gradle | 14 --
 gradle/wrapper/gradle-wrapper.properties |  2 +-
 settings.gradle  |  2 +-
 3 files changed, 10 insertions(+), 8 deletions(-)


[flink-training] 03/03: [hotfix][gradle] set project properties in a central place

2021-07-13 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git

commit 5c8386d022719f72fbb161c6727c0c2e2a3c12f1
Author: Nico Kruber 
AuthorDate: Fri Jul 2 11:36:27 2021 +0200

[hotfix][gradle] set project properties in a central place

Also, we actually only have the root project's description, so let's not set
it for all subprojects!
---
 build.gradle | 12 +++-
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/build.gradle b/build.gradle
index eb42442..7697737 100644
--- a/build.gradle
+++ b/build.gradle
@@ -19,6 +19,13 @@ plugins {
 id 'com.github.johnrengelman.shadow' version '7.0.0' apply false
 }
 
+description = "Flink Training Exercises"
+
+allprojects {
+group = 'org.apache.flink'
+version = '1.13-SNAPSHOT'
+}
+
 subprojects {
 apply plugin: 'java'
 apply plugin: 'scala' // optional; uncomment if needed
@@ -29,11 +36,6 @@ subprojects {
 apply plugin: 'checkstyle'
 apply plugin: 'eclipse'
 
-// artifact properties
-group = 'org.apache.flink'
-version = '1.13-SNAPSHOT'
-description = """Flink Training Exercises"""
-
 ext {
 javaVersion = '1.8'
 flinkVersion = '1.13.1'


[flink-training] 02/03: [hotfix][gradle] update build-scan plugin version

2021-07-13 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git

commit 1504a6477abc465775a39d9073b184e36659225d
Author: Nico Kruber 
AuthorDate: Fri Jul 2 11:17:24 2021 +0200

[hotfix][gradle] update build-scan plugin version
---
 settings.gradle | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/settings.gradle b/settings.gradle
index 24ea327..5e98871 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,5 +1,5 @@
 plugins {
-id "com.gradle.enterprise" version "3.2.1"
+id "com.gradle.enterprise" version "3.6.3"
 }
 
 rootProject.name = 'flink-training'


[flink-training] 01/03: [FLINK-23332][gradle] update Gradle version

2021-07-13 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git

commit 82620e098fa39c71538144b7f6a115163b071ccf
Author: Nico Kruber 
AuthorDate: Fri Jul 2 11:09:02 2021 +0200

[FLINK-23332][gradle] update Gradle version

This also requires a newer 'shadow' version.
---
 build.gradle | 2 +-
 gradle/wrapper/gradle-wrapper.properties | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/build.gradle b/build.gradle
index b516b5d..eb42442 100644
--- a/build.gradle
+++ b/build.gradle
@@ -16,7 +16,7 @@
  */
 
 plugins {
-id 'com.github.johnrengelman.shadow' version '5.2.0' apply false
+id 'com.github.johnrengelman.shadow' version '7.0.0' apply false
 }
 
 subprojects {
diff --git a/gradle/wrapper/gradle-wrapper.properties 
b/gradle/wrapper/gradle-wrapper.properties
index 6254d2d..69a9715 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-6.2.1-all.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-7.1-bin.zip
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists


[flink-training] branch master updated: [FLINK-23334][gradle] let the subprojects decide whether they implement an application

2021-07-13 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/master by this push:
 new 63d360e  [FLINK-23334][gradle] let the subprojects decide whether they 
implement an application
63d360e is described below

commit 63d360e49368041a4bbdd320a5c4b3600961dab2
Author: Nico Kruber 
AuthorDate: Fri Jul 2 11:37:19 2021 +0200

[FLINK-23334][gradle] let the subprojects decide whether they implement an 
application

This gives a cleaner gradle project file layout.
---
 build.gradle  | 7 ---
 hourly-tips/build.gradle  | 2 ++
 long-ride-alerts/build.gradle | 2 ++
 ride-cleansing/build.gradle   | 2 ++
 rides-and-fares/build.gradle  | 2 ++
 5 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/build.gradle b/build.gradle
index 7697737..5d0c8e9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -29,9 +29,6 @@ allprojects {
 subprojects {
 apply plugin: 'java'
 apply plugin: 'scala' // optional; uncomment if needed
-if (project != project(":common")) {
-apply plugin: 'application'
-}
 apply plugin: 'com.github.johnrengelman.shadow'
 apply plugin: 'checkstyle'
 apply plugin: 'eclipse'
@@ -120,10 +117,6 @@ subprojects {
 }
 }
 
-if (plugins.findPlugin('application')) {
-applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
-run.classpath = sourceSets.main.runtimeClasspath
-}
 
 jar {
 manifest {
diff --git a/hourly-tips/build.gradle b/hourly-tips/build.gradle
index 11df511..75e3e60 100644
--- a/hourly-tips/build.gradle
+++ b/hourly-tips/build.gradle
@@ -1 +1,3 @@
+apply plugin: 'application'
+
 mainClassName = 
'org.apache.flink.training.exercises.hourlytips.HourlyTipsExercise'
diff --git a/long-ride-alerts/build.gradle b/long-ride-alerts/build.gradle
index 62e4ffd..59b9448 100644
--- a/long-ride-alerts/build.gradle
+++ b/long-ride-alerts/build.gradle
@@ -1 +1,3 @@
+apply plugin: 'application'
+
 mainClassName = 
'org.apache.flink.training.exercises.longrides.LongRidesExercise'
diff --git a/ride-cleansing/build.gradle b/ride-cleansing/build.gradle
index 106e3b9..1f03917 100644
--- a/ride-cleansing/build.gradle
+++ b/ride-cleansing/build.gradle
@@ -1 +1,3 @@
+apply plugin: 'application'
+
 mainClassName = 
'org.apache.flink.training.exercises.ridecleansing.RideCleansingExercise'
diff --git a/rides-and-fares/build.gradle b/rides-and-fares/build.gradle
index 18366af..565aec6 100644
--- a/rides-and-fares/build.gradle
+++ b/rides-and-fares/build.gradle
@@ -1 +1,3 @@
+apply plugin: 'application'
+
 mainClassName = 
'org.apache.flink.training.exercises.ridesandfares.RidesAndFaresExercise'


[flink-training] branch master updated: [FLINK-23336][log4j] update Log4J to 2.12.1 as used by Flink 1.13

2021-07-13 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/master by this push:
 new 7c323f4  [FLINK-23336][log4j] update Log4J to 2.12.1 as used by Flink 
1.13
7c323f4 is described below

commit 7c323f4bb0a4659a57498c099dbe900ba023d43d
Author: Nico Kruber 
AuthorDate: Fri Jul 2 11:42:41 2021 +0200

[FLINK-23336][log4j] update Log4J to 2.12.1 as used by Flink 1.13
---
 build.gradle|  9 +
 .../src/main/resources/log4j2.properties| 13 +++--
 .../src/main/resources/log4j2.properties| 13 +++--
 .../src/main/resources/log4j2.properties| 13 +++--
 .../src/main/resources/log4j2.properties| 13 +++--
 .../checkpointing/src/main/resources/log4j2.properties  | 13 +++--
 .../src/main/resources/log4j2.properties| 13 +++--
 .../introduction/src/main/resources/log4j2.properties   | 13 +++--
 .../object-reuse/src/main/resources/log4j2.properties   | 13 +++--
 .../exercise/src/main/resources/log4j2.properties   | 13 +++--
 .../src/main/resources/log4j2.properties| 13 +++--
 .../solution/src/main/resources/log4j2.properties   | 13 +++--
 .../throughput/src/main/resources/log4j2.properties | 13 +++--
 13 files changed, 89 insertions(+), 76 deletions(-)

diff --git a/build.gradle b/build.gradle
index 5d0c8e9..6077960 100644
--- a/build.gradle
+++ b/build.gradle
@@ -37,8 +37,7 @@ subprojects {
 javaVersion = '1.8'
 flinkVersion = '1.13.1'
 scalaBinaryVersion = '2.12'
-slf4jVersion = '1.7.15'
-log4jVersion = '1.2.17'
+log4jVersion = '2.12.1'
 junitVersion = '4.12'
 }
 
@@ -73,6 +72,7 @@ subprojects {
 flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 
'jsr305'
 flinkShadowJar.exclude group: 'org.slf4j'
 flinkShadowJar.exclude group: 'log4j'
+flinkShadowJar.exclude group: 'org.apache.logging.log4j', module: 
'log4j-to-slf4j'
 
 // already provided dependencies from serializer frameworks
 flinkShadowJar.exclude group: 'com.esotericsoftware.kryo', module: 
'kryo'
@@ -82,8 +82,9 @@ subprojects {
 
 // common set of dependencies
 dependencies {
-implementation "log4j:log4j:${log4jVersion}"
-implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
+implementation 
"org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}"
+implementation "org.apache.logging.log4j:log4j-api:${log4jVersion}"
+implementation "org.apache.logging.log4j:log4j-core:${log4jVersion}"
 
 if (project != project(":common")) {
 implementation project(path: ':common')
diff --git a/rides-and-fares/src/main/resources/log4j.properties 
b/hourly-tips/src/main/resources/log4j2.properties
similarity index 78%
copy from rides-and-fares/src/main/resources/log4j.properties
copy to hourly-tips/src/main/resources/log4j2.properties
index da32ea0..8319d24 100644
--- a/rides-and-fares/src/main/resources/log4j.properties
+++ b/hourly-tips/src/main/resources/log4j2.properties
@@ -15,9 +15,10 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 

-
-log4j.rootLogger=INFO, console
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
+loogers=rootLooger
+appender.console.type=Console
+appender.console.name=STDOUT
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - 
%msg%n
+rootLogger.level=INFO
+rootLogger.appenderRef.console.ref=STDOUT
\ No newline at end of file
diff --git a/hourly-tips/src/main/resources/log4j.properties 
b/long-ride-alerts/src/main/resources/log4j2.properties
similarity index 78%
rename from hourly-tips/src/main/resources/log4j.properties
rename to long-ride-alerts/src/main/resources/log4j2.properties
index da32ea0..8319d24 100644
--- a/hourly-tips/src/main/resources/log4j.properties
+++ b/long-ride-alerts/src/main/resources/log4j2.properties
@@ -15,9 +15,10 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 

-
-log4j.rootLogger=INFO, console
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.Conversion

[flink-training] branch master updated: [FLINK-23337][gradle] Properly use the 'shadow' plugin

2021-07-13 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/master by this push:
 new 453d65a  [FLINK-23337][gradle] Properly use the 'shadow' plugin
453d65a is described below

commit 453d65a3e0bfe244427a6e025a1798c677073289
Author: Nico Kruber 
AuthorDate: Fri Jul 2 17:10:03 2021 +0200

[FLINK-23337][gradle] Properly use the 'shadow' plugin

This removes the need for the custom `flinkShadowJar` configuration and 
instead
defines dependencies with the default ways that the 'shadow' plugin offers.
---
 build.gradle| 69 ++---
 common/build.gradle |  8 ++-
 2 files changed, 30 insertions(+), 47 deletions(-)

diff --git a/build.gradle b/build.gradle
index 6077960..b13aaa9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -60,62 +60,38 @@ subprojects {
 }
 }
 
-// NOTE: We cannot use "compileOnly" or "shadow" configurations since then 
we could not run code
-// in the IDE or with "gradle run". We also cannot exclude transitive 
dependencies from the
-// shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
-// -> Explicitly define the // libraries we want to be included in the 
"flinkShadowJar" configuration!
-configurations {
-flinkShadowJar // dependencies which go into the shadowJar
-
-// provided by Flink
-flinkShadowJar.exclude group: 'org.apache.flink', module: 
'force-shading'
-flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 
'jsr305'
-flinkShadowJar.exclude group: 'org.slf4j'
-flinkShadowJar.exclude group: 'log4j'
-flinkShadowJar.exclude group: 'org.apache.logging.log4j', module: 
'log4j-to-slf4j'
-
-// already provided dependencies from serializer frameworks
-flinkShadowJar.exclude group: 'com.esotericsoftware.kryo', module: 
'kryo'
-flinkShadowJar.exclude group: 'javax.servlet', module: 'servlet-api'
-flinkShadowJar.exclude group: 'org.apache.httpcomponents', module: 
'httpclient'
-}
-
 // common set of dependencies
 dependencies {
-implementation 
"org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}"
-implementation "org.apache.logging.log4j:log4j-api:${log4jVersion}"
-implementation "org.apache.logging.log4j:log4j-core:${log4jVersion}"
+shadow "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}"
+shadow "org.apache.logging.log4j:log4j-api:${log4jVersion}"
+shadow "org.apache.logging.log4j:log4j-core:${log4jVersion}"
+
+shadow 
"org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
+shadow "org.apache.flink:flink-java:${flinkVersion}"
+shadow 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
+shadow 
"org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}"
 
 if (project != project(":common")) {
 implementation project(path: ':common')
-// transitive dependencies for flinkShadowJar need to be defined 
above
-// (the alternative of using configuration: 'shadow' does not work 
there because that adds a dependency on
-// the jar file, not the sources)
-flinkShadowJar project(path: ':common', transitive: false)
-
 testImplementation(project(":common")) {
 capabilities { requireCapability("$group:common-test") }
 }
 }
 }
 
-// make flinkShadowJar dependencies available:
+// add solution source dirs:
 sourceSets {
 main.java.srcDirs += 'src/solution/java'
 main.scala.srcDirs += 'src/solution/scala'
-main.compileClasspath += configurations.flinkShadowJar
-main.runtimeClasspath += configurations.flinkShadowJar
 
-test.compileClasspath += configurations.flinkShadowJar
-test.runtimeClasspath += configurations.flinkShadowJar
+// Add shadow configuration to runtime class path so that the
+// dynamically-generated tasks by IntelliJ are able to run and have
+// all dependencies they need. (Luckily, this does not influence what
+// ends up in the final shadowJar.)
+main.runtimeClasspath += configurations.shadow
 
-javadoc.classpath += configurations.flinkShadowJar
-}
-
-eclipse {
-classpath {
-plusConfigurations += [configurations.flinkShadowJar]
-}
+test.compileClasspath += configurations.shadow
+test.runtimeClasspath += configurations.shadow
 }
 
 
@@ -127,7 +103,18 @@ subprojects {
 }
 
 shadowJar {
-configurations = [project.configurations.flinkShadowJar]
+mergeServiceFiles()
+dependencies {
+exclude(dependency("org.apache.

[flink] branch release-1.13 updated: [hotfix][runtime] Log slot pool status if unable to fulfill job requirements

2021-07-13 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
 new 037687c  [hotfix][runtime] Log slot pool status if unable to fulfill 
job requirements
037687c is described below

commit 037687c2ea1f1b81cc77b14c11ad529b30da4db5
Author: Roman Khachatryan 
AuthorDate: Mon Jul 12 02:27:47 2021 +0200

[hotfix][runtime] Log slot pool status if unable to fulfill job requirements
---
 .../runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java| 9 ++---
 .../runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java   | 8 
 .../resourcemanager/slotmanager/DeclarativeSlotManager.java  | 5 -
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index 1e9a2b4..c9dafd2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -348,14 +348,17 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
 Collection acquiredResources) {
 assertRunningInMainThread();
 
-failPendingRequests();
+failPendingRequests(acquiredResources);
 }
 
-private void failPendingRequests() {
+private void failPendingRequests(Collection 
acquiredResources) {
 if (!pendingRequests.isEmpty()) {
 final NoResourceAvailableException cause =
 new NoResourceAvailableException(
-"Could not acquire the minimum required 
resources.");
+"Could not acquire the minimum required resources. 
Acquired: "
++ acquiredResources
++ ". Current slot pool status: "
++ getSlotServiceStatus());
 
 cancelPendingRequests(
 request -> !isBatchSlotRequestTimeoutCheckDisabled || 
!request.isBatchRequest(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
index 2c19471..ec1a970 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
@@ -306,4 +306,12 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
 STARTED,
 CLOSED,
 }
+
+protected String getSlotServiceStatus() {
+return String.format(
+"Registered TMs: %d, registered slots: %d free slots: %d",
+registeredTaskManagers.size(),
+declarativeSlotPool.getAllSlotsInformation().size(),
+declarativeSlotPool.getFreeSlotsInformation().size());
+}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index ab8e155..aa8238f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -647,7 +647,10 @@ public class DeclarativeSlotManager implements SlotManager 
{
 pendingSlots = allocationResult.getNewAvailableResources();
 if (!allocationResult.isSuccessfulAllocating()
 && sendNotEnoughResourceNotifications) {
-LOG.warn("Could not fulfill resource requirements of 
job {}.", jobId);
+LOG.warn(
+"Could not fulfill resource requirements of 
job {}. Free slots: {}",
+jobId,
+slotTracker.getFreeSlots().size());
 resourceActions.notifyNotEnoughResourcesAvailable(
 jobId, 
resourceTracker.getAcquiredResources(jobId));
 return pendingSlots;


[flink] branch master updated: [hotfix][runtime] Log slot pool status if unable to fulfill job requirements

2021-07-13 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new b1c5421  [hotfix][runtime] Log slot pool status if unable to fulfill 
job requirements
b1c5421 is described below

commit b1c5421a42c9e6bc5b16114fc75ccc7fb001c236
Author: Roman Khachatryan 
AuthorDate: Mon Jul 12 02:27:47 2021 +0200

[hotfix][runtime] Log slot pool status if unable to fulfill job requirements
---
 .../runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java| 9 ++---
 .../runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java   | 8 
 .../resourcemanager/slotmanager/DeclarativeSlotManager.java  | 5 -
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index e322df1..fbfd1ff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -348,14 +348,17 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
 Collection acquiredResources) {
 assertRunningInMainThread();
 
-failPendingRequests();
+failPendingRequests(acquiredResources);
 }
 
-private void failPendingRequests() {
+private void failPendingRequests(Collection 
acquiredResources) {
 if (!pendingRequests.isEmpty()) {
 final NoResourceAvailableException cause =
 new NoResourceAvailableException(
-"Could not acquire the minimum required 
resources.");
+"Could not acquire the minimum required resources. 
Acquired: "
++ acquiredResources
++ ". Current slot pool status: "
++ getSlotServiceStatus());
 
 cancelPendingRequests(
 request -> !isBatchSlotRequestTimeoutCheckDisabled || 
!request.isBatchRequest(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
index 2c19471..ec1a970 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java
@@ -306,4 +306,12 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
 STARTED,
 CLOSED,
 }
+
+protected String getSlotServiceStatus() {
+return String.format(
+"Registered TMs: %d, registered slots: %d free slots: %d",
+registeredTaskManagers.size(),
+declarativeSlotPool.getAllSlotsInformation().size(),
+declarativeSlotPool.getFreeSlotsInformation().size());
+}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 07fa560..18809e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -647,7 +647,10 @@ public class DeclarativeSlotManager implements SlotManager 
{
 pendingSlots = allocationResult.getNewAvailableResources();
 if (!allocationResult.isSuccessfulAllocating()
 && sendNotEnoughResourceNotifications) {
-LOG.warn("Could not fulfill resource requirements of 
job {}.", jobId);
+LOG.warn(
+"Could not fulfill resource requirements of 
job {}. Free slots: {}",
+jobId,
+slotTracker.getFreeSlots().size());
 resourceActions.notifyNotEnoughResourcesAvailable(
 jobId, 
resourceTracker.getAcquiredResources(jobId));
 return pendingSlots;


[flink-training] branch master updated: [FLINK-23335][gradle] add separate run tasks for Java/Scala exercises and solutions

2021-07-13 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/master by this push:
 new 9e90776  [FLINK-23335][gradle] add separate run tasks for Java/Scala 
exercises and solutions
9e90776 is described below

commit 9e907760965a883881d7891baa0d6a82011cb626
Author: Nico Kruber 
AuthorDate: Fri Jul 2 15:43:13 2021 +0200

[FLINK-23335][gradle] add separate run tasks for Java/Scala exercises and 
solutions

This allows to quickly run the current state of the exercise or the solution
implementation from the command line / IDE.
---
 README.md | 19 ++-
 build.gradle  | 39 +++
 hourly-tips/build.gradle  |  7 ++-
 long-ride-alerts/build.gradle |  7 ++-
 ride-cleansing/build.gradle   |  7 ++-
 rides-and-fares/build.gradle  |  7 ++-
 6 files changed, 81 insertions(+), 5 deletions(-)

diff --git a/README.md b/README.md
index 368b904..834032a 100644
--- a/README.md
+++ b/README.md
@@ -176,7 +176,24 @@ Each of these exercises includes an `...Exercise` class 
with most of the necessa
 
 > **:information_source: Note:** As long as your `...Exercise` class is 
 > throwing a `MissingSolutionException`, the provided JUnit test classes will 
 > ignore that failure and verify the correctness of the solution 
 > implementation instead.
 
-There are Java and Scala versions of all the exercise, test, and solution 
classes.
+There are Java and Scala versions of all the exercise, test, and solution 
classes, each of which can be run from IntelliJ as usual.
+
+ Running Exercises, Tests, and Solutions on the Command Line
+
+You can execute exercises, solutions, and tests via `gradlew` from a CLI.
+
+- Tests can be executed as usual:
+
+```bash
+./gradlew test
+./gradlew ::test
+```
+
+- For Java/Scala exercises and solutions, we provide special tasks that are 
listed via
+
+```bash
+./gradlew printRunTasks
+```
 
 -
 
diff --git a/build.gradle b/build.gradle
index b13aaa9..3702325 100644
--- a/build.gradle
+++ b/build.gradle
@@ -94,6 +94,18 @@ subprojects {
 test.runtimeClasspath += configurations.shadow
 }
 
+project.plugins.withId('application') {
+['javaExerciseClassName', 'scalaExerciseClassName',
+ 'javaSolutionClassName', 'scalaSolutionClassName'].each { property ->
+if (project.ext.has(property)) {
+project.tasks.create(name: 
classNamePropertyToTaskName(property), type: JavaExec) {
+classpath = project.sourceSets.main.runtimeClasspath
+mainClass = project.ext.get(property)
+group = 'flink-training'
+}
+}
+}
+}
 
 jar {
 manifest {
@@ -119,3 +131,30 @@ subprojects {
 
 assemble.dependsOn(shadowJar)
 }
+
+tasks.register('printRunTasks') {
+println ''
+println 'Flink Training Tasks runnable from root project \'' + 
project.name + '\''
+println ''
+
+subprojects.findAll { project ->
+boolean first = true;
+project.tasks.withType(JavaExec) { task ->
+if (task.group == 'flink-training') {
+if (first) {
+println ''
+println '> Subproject \'' + project.name + '\''
+first = false;
+}
+println './gradlew :' + project.name + ':' + task.name
+}
+}
+}
+}
+
+static def classNamePropertyToTaskName(String property) {
+return 'run' +
+property.charAt(0).toString().toUpperCase() +
+property.substring(1, property.lastIndexOf('ClassName'))
+
+}
diff --git a/hourly-tips/build.gradle b/hourly-tips/build.gradle
index 75e3e60..20bcd13 100644
--- a/hourly-tips/build.gradle
+++ b/hourly-tips/build.gradle
@@ -1,3 +1,8 @@
+ext.javaExerciseClassName = 
'org.apache.flink.training.exercises.hourlytips.HourlyTipsExercise'
+ext.scalaExerciseClassName = 
'org.apache.flink.training.exercises.hourlytips.scala.HourlyTipsExercise'
+ext.javaSolutionClassName = 
'org.apache.flink.training.solutions.hourlytips.HourlyTipsSolution'
+ext.scalaSolutionClassName = 
'org.apache.flink.training.solutions.hourlytips.scala.HourlyTipsSolution'
+
 apply plugin: 'application'
 
-mainClassName = 
'org.apache.flink.training.exercises.hourlytips.HourlyTipsExercise'
+mainClassName = ext.javaExerciseClassName
diff --git a/long-ride-alerts/build.gradle b/long-ride-alerts/build.gradle
index 59b9448..8036a4f 100644
--- a/long-ride-alerts/build.gradle
+++ b/long-ride-alerts/build.gradle
@@ -1,3 +1,8 @@
+ext.javaExerciseClassName = 
'org.apache.flink.training.exercises.lo

[flink-training] 03/03: [FLINK-23338] Add .git-blame-ignore-revs for ignoring refactor commit

2021-07-13 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git

commit 5d17ed7eafbc636a510b4f933101226835fefa24
Author: Nico Kruber 
AuthorDate: Fri Jul 2 17:46:10 2021 +0200

[FLINK-23338] Add .git-blame-ignore-revs for ignoring refactor commit

This file can be used via:

$ git config blame.ignoreRevsFile .git-blame-ignore-revs

This closes #27.
---
 .git-blame-ignore-revs | 1 +
 1 file changed, 1 insertion(+)

diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs
new file mode 100644
index 000..b41871e
--- /dev/null
+++ b/.git-blame-ignore-revs
@@ -0,0 +1 @@
+c4baefd1830abe3ba9bcf200be0f5d00095cae12


[flink-training] 02/03: [FLINK-23338] Format code with Spotless/google-java-format

2021-07-13 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git

commit c4baefd1830abe3ba9bcf200be0f5d00095cae12
Author: Rufus Refactor 
AuthorDate: Tue Jul 13 21:45:34 2021 +0200

[FLINK-23338] Format code with Spotless/google-java-format
---
 README.md  |   2 +-
 .../examples/ridecount/RideCountExample.java   |  60 +--
 .../exercises/common/datatypes/TaxiFare.java   | 154 
 .../exercises/common/datatypes/TaxiRide.java   | 297 +++---
 .../common/sources/TaxiFareGenerator.java  |  35 +-
 .../common/sources/TaxiRideGenerator.java  | 109 +++---
 .../exercises/common/utils/DataGenerator.java  | 304 +++
 .../exercises/common/utils/ExerciseBase.java   |  98 ++---
 .../training/exercises/common/utils/GeoUtils.java  | 433 ++---
 .../common/utils/MissingSolutionException.java |  10 +-
 .../exercises/testing/TaxiRideTestBase.java| 291 +++---
 .../training/exercises/testing/TestSource.java |  44 +--
 hourly-tips/DISCUSSION.md  |  34 +-
 .../exercises/hourlytips/HourlyTipsExercise.java   |  38 +-
 .../solutions/hourlytips/HourlyTipsSolution.java   |  92 ++---
 .../exercises/hourlytips/HourlyTipsTest.java   |  97 ++---
 long-ride-alerts/DISCUSSION.md |   2 +-
 long-ride-alerts/README.md |   8 +-
 .../exercises/longrides/LongRidesExercise.java |  61 ++-
 .../solutions/longrides/LongRidesSolution.java | 136 +++
 .../exercises/longrides/LongRidesTest.java | 178 +
 .../ridecleansing/RideCleansingExercise.java   |  57 ++-
 .../ridecleansing/RideCleansingSolution.java   |  58 +--
 .../exercises/ridecleansing/RideCleansingTest.java |  73 ++--
 .../ridesandfares/RidesAndFaresExercise.java   |  69 ++--
 .../ridesandfares/RidesAndFaresSolution.java   | 162 
 .../exercises/ridesandfares/RidesAndFaresTest.java |  93 ++---
 27 files changed, 1489 insertions(+), 1506 deletions(-)

diff --git a/README.md b/README.md
index 834032a..37c1d26 100644
--- a/README.md
+++ b/README.md
@@ -111,7 +111,7 @@ Once that’s done you should be able to open 
[`RideCleansingTest`](ride-cleansi
 
 ## Using the Taxi Data Streams
 
-These exercises use data 
[generators](common/src/main/java/org/apache/flink/training/exercises/common/sources)
 that produce simulated event streams 
+These exercises use data 
[generators](common/src/main/java/org/apache/flink/training/exercises/common/sources)
 that produce simulated event streams
 inspired by those shared by the [New York City Taxi & Limousine 
Commission](http://www.nyc.gov/html/tlc/html/home/home.shtml)
 in their public [data set](https://uofi.app.box.com/NYCtaxidata) about taxi 
rides in New York City.
 
diff --git 
a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
 
b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
index 74e56fe..36b7b89 100644
--- 
a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
+++ 
b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
@@ -29,44 +29,46 @@ import 
org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
 /**
  * Example that counts the rides for each driver.
  *
- * Note that this is implicitly keeping state for each driver.
- * This sort of simple, non-windowed aggregation on an unbounded set of keys 
will use an unbounded amount of state.
- * When this is an issue, look at the SQL/Table API, or ProcessFunction, or 
state TTL, all of which provide
+ * Note that this is implicitly keeping state for each driver. This sort of 
simple, non-windowed
+ * aggregation on an unbounded set of keys will use an unbounded amount of 
state. When this is an
+ * issue, look at the SQL/Table API, or ProcessFunction, or state TTL, all of 
which provide
  * mechanisms for expiring state for stale keys.
  */
 public class RideCountExample {
 
-   /**
-* Main method.
-*
-* @throws Exception which occurs during job execution.
-*/
-   public static void main(String[] args) throws Exception {
+/**
+ * Main method.
+ *
+ * @throws Exception which occurs during job execution.
+ */
+public static void main(String[] args) throws Exception {
 
-   // set up streaming execution environment
-   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+// set up streaming execution environment
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-   // start the data generator
-   DataStream rides = env.addSource(new 
TaxiRideGenerator());
+// start

[flink-training] 01/03: [FLINK-23338] Add Spotless plugin with Google AOSP style

2021-07-13 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git

commit 8399d88b3638d69db01cea341631979a281d1072
Author: Nico Kruber 
AuthorDate: Fri Jul 2 17:36:31 2021 +0200

[FLINK-23338] Add Spotless plugin with Google AOSP style

Use the same format as defined by the main Flink project.
See https://issues.apache.org/jira/browse/FLINK-20651
---
 build.gradle   |  27 ++
 config/checkstyle/checkstyle.xml   | 891 ++---
 config/checkstyle/suppressions.xml |  10 +-
 3 files changed, 460 insertions(+), 468 deletions(-)

diff --git a/build.gradle b/build.gradle
index 3702325..0442b26 100644
--- a/build.gradle
+++ b/build.gradle
@@ -17,6 +17,7 @@
 
 plugins {
 id 'com.github.johnrengelman.shadow' version '7.0.0' apply false
+id "com.diffplug.spotless" version "5.14.0" apply false
 }
 
 description = "Flink Training Exercises"
@@ -24,6 +25,20 @@ description = "Flink Training Exercises"
 allprojects {
 group = 'org.apache.flink'
 version = '1.13-SNAPSHOT'
+
+apply plugin: 'com.diffplug.spotless'
+
+spotless {
+format 'misc', {
+// define the files to apply `misc` to
+target '*.gradle', '*.md', '.gitignore'
+
+// define the steps to apply to those files
+trimTrailingWhitespace()
+indentWithSpaces(4)
+endWithNewline()
+}
+}
 }
 
 subprojects {
@@ -107,6 +122,18 @@ subprojects {
 }
 }
 
+spotless {
+java {
+googleJavaFormat('1.7').aosp()
+
+// \# refers to static imports
+importOrder('org.apache.flink', 'org.apache.flink.shaded', '', 
'javax', 'java', 'scala', '\\#')
+removeUnusedImports()
+
+targetExclude("**/generated*/*.java")
+}
+}
+
 jar {
 manifest {
 attributes 'Built-By': System.getProperty('user.name'),
diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml
index e656fda..56d219d 100644
--- a/config/checkstyle/checkstyle.xml
+++ b/config/checkstyle/checkstyle.xml
@@ -31,540 +31,505 @@ This file is based on the checkstyle file of Apache Beam.
 
 
 
-  
-
-  
-
-  
-
-
-  
-
-  
-
-
-
-
-  
-
-  
-
-
-
-  
-
-  
-
-
-
-  
-
-  
-  
-
-  
-
-  
-
-  
-
-  
-  
-
-  
-
-  
-
-  
-
-  
-  
-
-
-
-  
-  
-  
-
-
-
-
-
-  
-  
-  
-
-
-
-
-  
-  
-  
-
-
-
-  
-  
-  
-
-
-
-  
-  
-  
-
-
-
-
-
-
-  
-  
-  
-
-
-
-  
-  
-  
-
-
-
-  
-  
-  
-
-
-  
-  
-  
-
-
-
-  
-  
-  
-
-
-  
-  
-  
-
-
-  
-  
-  
-
-
-  
-  
-  
+
+
+
 
 
-
-
-
-
-  
-  
+
+
+
+
+
 
 
-
-
-
-  
-  
-  
+
+
+
+
 
 
-
-  
-  
-  
-  
-  
-  
-  
-  
+
+
+
+
 
 
-
-  
-
-
-
-  
-
-
-
-  
-  
-
-
-
-
-  
+
+
+
 
 
-
-
-  
-  
+
+
 
 
-
-  
-  
-  
-
+
+
 
 
 
-
-
-
-  
-  
-  
-  
-  
-
-
-
+
+
 
 
-
-
+
+
 
-
-  
-  
-  
-
+
+
+
+
+
+
 
-
-  
-  
-
+
 
--->
+
+
+
+
+
+
 
-
+
+
+
+
+
 
-
-  
-  
-  
-  
-
+
+
+
+
+
 
-
-  
-  
-  
-
+
-  
-  
-  
-  
-  
-  
-  
-  
-
+IllegalImport cannot blacklist classes so we have to fall back to 
Regexp.
 
-
-  
-  
-  
-  
-  
-  
-  
-  
-
+-->
 
-
-  
-  
-  
-  
-  
-  
-  
-  
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+   

[flink-training] branch master updated (9e90776 -> 5d17ed7)

2021-07-13 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git.


from 9e90776  [FLINK-23335][gradle] add separate run tasks for Java/Scala 
exercises and solutions
 new 8399d88  [FLINK-23338] Add Spotless plugin with Google AOSP style
 new c4baefd  [FLINK-23338] Format code with Spotless/google-java-format
 new 5d17ed7  [FLINK-23338] Add .git-blame-ignore-revs for ignoring 
refactor commit

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .git-blame-ignore-revs |   1 +
 README.md  |   2 +-
 build.gradle   |  27 +
 .../examples/ridecount/RideCountExample.java   |  60 +-
 .../exercises/common/datatypes/TaxiFare.java   | 154 ++--
 .../exercises/common/datatypes/TaxiRide.java   | 297 ---
 .../common/sources/TaxiFareGenerator.java  |  35 +-
 .../common/sources/TaxiRideGenerator.java  | 109 +--
 .../exercises/common/utils/DataGenerator.java  | 304 ---
 .../exercises/common/utils/ExerciseBase.java   |  98 +--
 .../training/exercises/common/utils/GeoUtils.java  | 433 +-
 .../common/utils/MissingSolutionException.java |  10 +-
 .../exercises/testing/TaxiRideTestBase.java| 291 +++
 .../training/exercises/testing/TestSource.java |  44 +-
 config/checkstyle/checkstyle.xml   | 891 ++---
 config/checkstyle/suppressions.xml |  10 +-
 hourly-tips/DISCUSSION.md  |  34 +-
 .../exercises/hourlytips/HourlyTipsExercise.java   |  38 +-
 .../solutions/hourlytips/HourlyTipsSolution.java   |  92 +--
 .../exercises/hourlytips/HourlyTipsTest.java   |  97 +--
 long-ride-alerts/DISCUSSION.md |   2 +-
 long-ride-alerts/README.md |   8 +-
 .../exercises/longrides/LongRidesExercise.java |  61 +-
 .../solutions/longrides/LongRidesSolution.java | 136 ++--
 .../exercises/longrides/LongRidesTest.java | 178 ++--
 .../ridecleansing/RideCleansingExercise.java   |  57 +-
 .../ridecleansing/RideCleansingSolution.java   |  58 +-
 .../exercises/ridecleansing/RideCleansingTest.java |  73 +-
 .../ridesandfares/RidesAndFaresExercise.java   |  69 +-
 .../ridesandfares/RidesAndFaresSolution.java   | 162 ++--
 .../exercises/ridesandfares/RidesAndFaresTest.java |  93 +--
 31 files changed, 1950 insertions(+), 1974 deletions(-)
 create mode 100644 .git-blame-ignore-revs


[flink] 02/02: [hotfix] Remove unnecessary warning suppression.

2021-07-13 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6451a1c169fbde44c2c7c458cc717c0fcad01cf9
Author: Stephan Ewen 
AuthorDate: Tue Jul 13 15:05:25 2021 +0200

[hotfix] Remove unnecessary warning suppression.

This suppression was added to supress compiler warning about 'finally' not 
returning from 'System.exit()'.
With the introduction of the indirection to the security manager, this 
compiler warning no longer happens.
---
 .../src/main/java/org/apache/flink/util/FatalExitExceptionHandler.java   | 1 -
 1 file changed, 1 deletion(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/FatalExitExceptionHandler.java 
b/flink-core/src/main/java/org/apache/flink/util/FatalExitExceptionHandler.java
index 2433398..8ff0e9b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/FatalExitExceptionHandler.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/FatalExitExceptionHandler.java
@@ -37,7 +37,6 @@ public final class FatalExitExceptionHandler implements 
Thread.UncaughtException
 public static final int EXIT_CODE = -17;
 
 @Override
-@SuppressWarnings("finally")
 public void uncaughtException(Thread t, Throwable e) {
 try {
 LOG.error(


[flink] 01/02: [FLINK-22545][coordination] Fix check during creation of Source Coordinator thread.

2021-07-13 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b867d8a6c187d6bef45294a6e6a524254410167
Author: Stephan Ewen 
AuthorDate: Tue Jul 13 14:36:32 2021 +0200

[FLINK-22545][coordination] Fix check during creation of Source Coordinator 
thread.

The check was meant as a safeguard to prevent re-instantiation after fatal 
errors killed a previous thread.
But the check was susceptible to thread termination due to idleness in the 
executor.

This updates the check to only fail if there is in fact an instantiation 
next to a running thread, or after a
previously crashed thread.
---
 .../coordinator/SourceCoordinatorProvider.java | 33 +-
 1 file changed, 26 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index bb6f835..56248f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
 import org.apache.flink.util.FatalExitExceptionHandler;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -85,13 +87,16 @@ public class SourceCoordinatorProvider
 }
 
 /** A thread factory class that provides some helper methods. */
-public static class CoordinatorExecutorThreadFactory implements 
ThreadFactory {
+public static class CoordinatorExecutorThreadFactory
+implements ThreadFactory, Thread.UncaughtExceptionHandler {
 
 private final String coordinatorThreadName;
 private final ClassLoader cl;
 private final Thread.UncaughtExceptionHandler errorHandler;
 
-private Thread t;
+@Nullable private Thread t;
+
+@Nullable private volatile Throwable previousFailureReason;
 
 CoordinatorExecutorThreadFactory(
 final String coordinatorThreadName, final ClassLoader 
contextClassLoader) {
@@ -110,18 +115,32 @@ public class SourceCoordinatorProvider
 
 @Override
 public synchronized Thread newThread(Runnable r) {
-if (t != null) {
+if (t != null && t.isAlive()) {
+throw new Error(
+"Source Coordinator Thread already exists. There 
should never be more than one "
++ "thread driving the actions of a Source 
Coordinator. Existing Thread: "
++ t);
+}
+if (t != null && previousFailureReason != null) {
 throw new Error(
-"This indicates that a fatal error has happened and 
caused the "
-+ "coordinator executor thread to exit. Check 
the earlier logs "
-+ "to see the root cause of the problem.");
+"The following fatal error has happened in a 
previously spawned "
++ "Source Coordinator thread. No new thread 
can be spawned.",
+previousFailureReason);
 }
 t = new Thread(r, coordinatorThreadName);
 t.setContextClassLoader(cl);
-t.setUncaughtExceptionHandler(errorHandler);
+t.setUncaughtExceptionHandler(this);
 return t;
 }
 
+@Override
+public synchronized void uncaughtException(Thread t, Throwable e) {
+if (previousFailureReason == null) {
+previousFailureReason = e;
+}
+errorHandler.uncaughtException(t, e);
+}
+
 String getCoordinatorThreadName() {
 return coordinatorThreadName;
 }


[flink] branch master updated (b1c5421 -> 6451a1c)

2021-07-13 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b1c5421  [hotfix][runtime] Log slot pool status if unable to fulfill 
job requirements
 new 4b867d8  [FLINK-22545][coordination] Fix check during creation of 
Source Coordinator thread.
 new 6451a1c  [hotfix] Remove unnecessary warning suppression.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/util/FatalExitExceptionHandler.java  |  1 -
 .../coordinator/SourceCoordinatorProvider.java | 33 +-
 2 files changed, 26 insertions(+), 8 deletions(-)


[flink] branch release-1.13 updated: [FLINK-22545][coordination] Fix check during creation of Source Coordinator thread.

2021-07-13 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
 new 17a294d  [FLINK-22545][coordination] Fix check during creation of 
Source Coordinator thread.
17a294d is described below

commit 17a294daacdc67b1939607a41e67e56af2fa6888
Author: Stephan Ewen 
AuthorDate: Tue Jul 13 14:36:32 2021 +0200

[FLINK-22545][coordination] Fix check during creation of Source Coordinator 
thread.

The check was meant as a safeguard to prevent re-instantiation after fatal 
errors killed a previous thread.
But the check was susceptible to thread termination due to idleness in the 
executor.

This updates the check to only fail if there is in fact an instantiation 
next to a running thread, or after a
previously crashed thread.
---
 .../coordinator/SourceCoordinatorProvider.java | 33 +-
 1 file changed, 26 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 563ed28..1660027 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -85,13 +87,16 @@ public class SourceCoordinatorProvider
 }
 
 /** A thread factory class that provides some helper methods. */
-public static class CoordinatorExecutorThreadFactory implements 
ThreadFactory {
+public static class CoordinatorExecutorThreadFactory
+implements ThreadFactory, Thread.UncaughtExceptionHandler {
 
 private final String coordinatorThreadName;
 private final ClassLoader cl;
 private final Thread.UncaughtExceptionHandler errorHandler;
 
-private Thread t;
+@Nullable private Thread t;
+
+@Nullable private volatile Throwable previousFailureReason;
 
 CoordinatorExecutorThreadFactory(
 final String coordinatorThreadName, final ClassLoader 
contextClassLoader) {
@@ -110,18 +115,32 @@ public class SourceCoordinatorProvider
 
 @Override
 public synchronized Thread newThread(Runnable r) {
-if (t != null) {
+if (t != null && t.isAlive()) {
+throw new Error(
+"Source Coordinator Thread already exists. There 
should never be more than one "
++ "thread driving the actions of a Source 
Coordinator. Existing Thread: "
++ t);
+}
+if (t != null && previousFailureReason != null) {
 throw new Error(
-"This indicates that a fatal error has happened and 
caused the "
-+ "coordinator executor thread to exit. Check 
the earlier logs"
-+ "to see the root cause of the problem.");
+"The following fatal error has happened in a 
previously spawned "
++ "Source Coordinator thread. No new thread 
can be spawned.",
+previousFailureReason);
 }
 t = new Thread(r, coordinatorThreadName);
 t.setContextClassLoader(cl);
-t.setUncaughtExceptionHandler(errorHandler);
+t.setUncaughtExceptionHandler(this);
 return t;
 }
 
+@Override
+public synchronized void uncaughtException(Thread t, Throwable e) {
+if (previousFailureReason == null) {
+previousFailureReason = e;
+}
+errorHandler.uncaughtException(t, e);
+}
+
 String getCoordinatorThreadName() {
 return coordinatorThreadName;
 }


[flink] branch release-1.12 updated (2c3f8f6 -> 331ae2c)

2021-07-13 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2c3f8f6  [FLINK-23233][runtime] Ensure checkpoints confirmed after all 
the failed events processed for OepratorCoordinator
 add 331ae2c  [FLINK-22545][coordination] Fix check during creation of 
Source Coordinator thread.

No new revisions were added by this update.

Summary of changes:
 .../coordinator/SourceCoordinatorProvider.java | 33 +-
 1 file changed, 26 insertions(+), 7 deletions(-)


[flink] branch master updated (6451a1c -> 2dec1fa)

2021-07-13 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 6451a1c  [hotfix] Remove unnecessary warning suppression.
 add 2dec1fa  [hotfix][docs][kafka] Corrected idleness attribute name

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/docs/connectors/datastream/kafka.md | 2 +-
 docs/content/docs/connectors/datastream/kafka.md| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)


[flink] branch master updated (2dec1fa -> 08b2048)

2021-07-13 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2dec1fa  [hotfix][docs][kafka] Corrected idleness attribute name
 add 08b2048  [FLINK-23347][docs][datatream] Updated operators windowAll 
description

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/docs/dev/datastream/operators/overview.md | 2 +-
 docs/content/docs/dev/datastream/operators/overview.md| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)


[flink] branch master updated (08b2048 -> df4fa8a)

2021-07-13 Thread hxb
This is an automated email from the ASF dual-hosted git repository.

hxb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 08b2048  [FLINK-23347][docs][datatream] Updated operators windowAll 
description
 add df4fa8a  [FLINK-22865][python] Optimize state serialize/deserialize in 
PyFlink

No new revisions were added by this update.

Summary of changes:
 flink-python/pyflink/common/serializer.py  |   4 +
 flink-python/pyflink/datastream/data_stream.py |   7 +-
 flink-python/pyflink/datastream/state.py   |  26 +
 .../pyflink/datastream/tests/test_data_stream.py   |  35 +++
 flink-python/pyflink/datastream/window.py  |  22 ++--
 .../fn_execution/beam/beam_coder_impl_fast.pyx |   8 +-
 .../fn_execution/beam/beam_coder_impl_slow.py  |   5 +-
 .../pyflink/fn_execution/beam/beam_coders.py   |  29 +-
 .../fn_execution/beam/beam_operations_fast.pyx |   2 +-
 .../beam/{beam_stream.pxd => beam_stream_fast.pxd} |   0
 .../beam/{beam_stream.pyx => beam_stream_fast.pyx} |   0
 .../pyflink/fn_execution/beam/beam_stream_slow.py  |  25 +++--
 .../pyflink/fn_execution/coder_impl_fast.pxd   |   9 +-
 .../pyflink/fn_execution/coder_impl_fast.pyx   |  45 -
 .../pyflink/fn_execution/coder_impl_slow.py|  51 +-
 flink-python/pyflink/fn_execution/coders.py| 111 -
 .../pyflink/fn_execution/datastream/operations.py  |   4 +-
 .../fn_execution/datastream/runtime_context.py |  25 +++--
 .../datastream/window/merging_window_set.py|  10 +-
 .../datastream/window/window_operator.py   |  12 +--
 flink-python/pyflink/fn_execution/state_impl.py|   2 +
 flink-python/pyflink/fn_execution/stream_slow.py   |   3 +-
 .../pyflink/fn_execution/table/aggregate_fast.pyx  |  15 ++-
 .../pyflink/fn_execution/table/aggregate_slow.py   |  15 ++-
 .../pyflink/fn_execution/table/operations.py   |   4 +-
 .../pyflink/fn_execution/table/state_data_view.py  |   4 +-
 .../fn_execution/table/window_aggregate_fast.pyx   |  12 +--
 .../fn_execution/table/window_aggregate_slow.py|  12 +--
 .../pyflink/fn_execution/table/window_assigner.py  |   5 +-
 .../pyflink/fn_execution/table/window_context.py   |  15 ++-
 .../pyflink/fn_execution/table/window_trigger.py   |   2 +-
 flink-python/setup.py  |   8 +-
 32 files changed, 336 insertions(+), 191 deletions(-)
 rename flink-python/pyflink/fn_execution/beam/{beam_stream.pxd => 
beam_stream_fast.pxd} (100%)
 rename flink-python/pyflink/fn_execution/beam/{beam_stream.pyx => 
beam_stream_fast.pyx} (100%)
 copy flink-dist/src/main/flink-bin/conf/zoo.cfg => 
flink-python/pyflink/fn_execution/beam/beam_stream_slow.py (65%)


[flink] branch release-1.13.2-rc2 created (now acaad85)

2021-07-13 Thread tangyun
This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a change to branch release-1.13.2-rc2
in repository https://gitbox.apache.org/repos/asf/flink.git.


  at acaad85  Commit for release 1.13.2

This branch includes the following new commits:

 new acaad85  Commit for release 1.13.2

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flink] 01/01: Commit for release 1.13.2

2021-07-13 Thread tangyun
This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch release-1.13.2-rc2
in repository https://gitbox.apache.org/repos/asf/flink.git

commit acaad856ee68422130719bd44cb204da45d130ab
Author: Yun Tang 
AuthorDate: Wed Jul 14 10:38:57 2021 +0800

Commit for release 1.13.2
---
 docs/config.toml  | 2 +-
 flink-annotations/pom.xml | 2 +-
 flink-clients/pom.xml | 2 +-
 flink-connectors/flink-connector-base/pom.xml | 2 +-
 flink-connectors/flink-connector-cassandra/pom.xml| 2 +-
 flink-connectors/flink-connector-elasticsearch-base/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch5/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch6/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch7/pom.xml   | 2 +-
 flink-connectors/flink-connector-files/pom.xml| 2 +-
 flink-connectors/flink-connector-gcp-pubsub/pom.xml   | 2 +-
 flink-connectors/flink-connector-hbase-1.4/pom.xml| 2 +-
 flink-connectors/flink-connector-hbase-2.2/pom.xml| 2 +-
 flink-connectors/flink-connector-hbase-base/pom.xml   | 2 +-
 flink-connectors/flink-connector-hive/pom.xml | 2 +-
 flink-connectors/flink-connector-jdbc/pom.xml | 2 +-
 flink-connectors/flink-connector-kafka/pom.xml| 2 +-
 flink-connectors/flink-connector-kinesis/pom.xml  | 2 +-
 flink-connectors/flink-connector-nifi/pom.xml | 2 +-
 flink-connectors/flink-connector-rabbitmq/pom.xml | 2 +-
 flink-connectors/flink-connector-twitter/pom.xml  | 2 +-
 flink-connectors/flink-file-sink-common/pom.xml   | 2 +-
 flink-connectors/flink-hadoop-compatibility/pom.xml   | 2 +-
 flink-connectors/flink-hcatalog/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-elasticsearch6/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-elasticsearch7/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hbase-1.4/pom.xml| 2 +-
 flink-connectors/flink-sql-connector-hbase-2.2/pom.xml| 2 +-
 flink-connectors/flink-sql-connector-hive-1.2.2/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-2.2.0/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-2.3.6/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-3.1.2/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-kafka/pom.xml| 2 +-
 flink-connectors/flink-sql-connector-kinesis/pom.xml  | 2 +-
 flink-connectors/pom.xml  | 2 +-
 flink-container/pom.xml   | 2 +-
 flink-contrib/flink-connector-wikiedits/pom.xml   | 2 +-
 flink-contrib/pom.xml | 2 +-
 flink-core/pom.xml| 2 +-
 flink-dist/pom.xml| 2 +-
 flink-docs/pom.xml| 2 +-
 flink-end-to-end-tests/flink-batch-sql-test/pom.xml   | 2 +-
 flink-end-to-end-tests/flink-cli-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml| 2 +-
 .../flink-connector-gcp-pubsub-emulator-tests/pom.xml | 2 +-
 flink-end-to-end-tests/flink-dataset-allround-test/pom.xml| 2 +-
 .../flink-dataset-fine-grained-recovery-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-datastream-allround-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch7-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml| 2 +-
 flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-end-to-end-tests-hbase/pom.xml   | 2 +-
 flink-end-to-end-tests/flink-file-sink-test/pom.xml   | 2 +-
 flink-end-to-end-tests/flink-glue-schema-registry-test/pom.xml| 2 +-
 flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-high-parallelism-iterations-test/pom.xml | 2 +-
 .../flink-local-recovery-and-allocation-test/pom.xml