[flink] branch master updated (83c5e71 -> 5aba616)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 83c5e71 [FLINK-23150][table-planner] Remove the old code split implementation add 1251ab8 [FLINK-21088][runtime][checkpoint] Assign a finished snapshot to task if operators are all finished on restore add df99b49 [FLINK-21088][runtime][checkpoint] Pass the finished on restore status to TaskStateManager add 5aba616 [FLINK-21088][runtime][checkpoint] Pass the finish on restore status to operator chain No new revisions were added by this update. Summary of changes: .../api/runtime/SavepointTaskStateManager.java | 5 ++ .../checkpoint/StateAssignmentOperation.java | 56 ++ .../runtime/checkpoint/TaskStateAssignment.java| 11 - .../runtime/checkpoint/TaskStateSnapshot.java | 17 ++- .../flink/runtime/state/TaskStateManager.java | 3 ++ .../flink/runtime/state/TaskStateManagerImpl.java | 8 .../checkpoint/StateAssignmentOperationTest.java | 31 .../runtime/state/TaskStateManagerImplTest.java| 17 +++ .../flink/runtime/state/TestTaskStateManager.java | 10 .../streaming/runtime/tasks/OperatorChain.java | 11 - .../flink/streaming/runtime/tasks/StreamTask.java | 6 ++- .../operators/StreamOperatorChainingTest.java | 2 +- .../StreamSourceOperatorLatencyMetricsTest.java| 4 +- .../tasks/StreamTaskExecutionDecorationTest.java | 2 +- .../tasks/SubtaskCheckpointCoordinatorTest.java| 15 -- 15 files changed, 165 insertions(+), 33 deletions(-)
[flink] branch master updated: [FLINK-23356][hbase] Do not use delegation token in case of keytab
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 657df14 [FLINK-23356][hbase] Do not use delegation token in case of keytab 657df14 is described below commit 657df14677a8f4e500efdc79627f095e32e8c4b4 Author: Gabor Somogyi AuthorDate: Mon Jul 12 10:40:54 2021 +0200 [FLINK-23356][hbase] Do not use delegation token in case of keytab Closes #16466 --- .../org/apache/flink/runtime/security/modules/HadoopModule.java | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java index 9d23992..78f7ff3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java @@ -92,11 +92,13 @@ public class HadoopModule implements SecurityModule { // and does not fallback to using Kerberos tickets Credentials credentialsToBeAdded = new Credentials(); final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN"); +final Text hbaseDelegationTokenKind = new Text("HBASE_AUTH_TOKEN"); Collection> usrTok = credentialsFromTokenStorageFile.getAllTokens(); -// If UGI use keytab for login, do not load HDFS delegation token. +// If UGI use keytab for login, do not load HDFS/HBase delegation token. for (Token token : usrTok) { -if (!token.getKind().equals(hdfsDelegationTokenKind)) { +if (!token.getKind().equals(hdfsDelegationTokenKind) +&& !token.getKind().equals(hbaseDelegationTokenKind)) { credentialsToBeAdded.addToken(token.getService(), token); } }
[flink] branch master updated: [FLINK-23368][python] Fix the wrong mapping of state cache in PyFlink
This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 08e9343 [FLINK-23368][python] Fix the wrong mapping of state cache in PyFlink 08e9343 is described below commit 08e9343b0be8e1d438fe9883f5cc407c5ae9e88e Author: huangxingbo AuthorDate: Tue Jul 13 16:03:16 2021 +0800 [FLINK-23368][python] Fix the wrong mapping of state cache in PyFlink This closes #16476. --- .../pyflink/datastream/tests/test_data_stream.py | 44 ++ flink-python/pyflink/fn_execution/state_impl.py| 4 +- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py index 3d85c4a..027cea3 100644 --- a/flink-python/pyflink/datastream/tests/test_data_stream.py +++ b/flink-python/pyflink/datastream/tests/test_data_stream.py @@ -395,7 +395,13 @@ class DataStreamTests(object): self.assertEqual(expected, actual) def test_key_by_map(self): -ds = self.env.from_collection([('a', 0), ('b', 0), ('c', 1), ('d', 1), ('e', 2)], +from pyflink.util.java_utils import get_j_env_configuration +from pyflink.common import Configuration +self.env.set_parallelism(1) +config = Configuration( + j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment)) +config.set_integer("python.fn-execution.bundle.size", 1) +ds = self.env.from_collection([('a', 0), ('b', 1), ('c', 0), ('d', 1), ('e', 2)], type_info=Types.ROW([Types.STRING(), Types.INT()])) keyed_stream = ds.key_by(MyKeySelector(), key_type=Types.INT()) @@ -404,7 +410,6 @@ class DataStreamTests(object): class AssertKeyMapFunction(MapFunction): def __init__(self): -self.pre = None self.state = None def open(self, runtime_context: RuntimeContext): @@ -412,28 +417,37 @@ class DataStreamTests(object): ValueStateDescriptor("test_state", Types.PICKLED_BYTE_ARRAY())) def map(self, value): +if value[0] == 'a': +pass +elif value[0] == 'b': +state_value = self._get_state_value() +assert state_value == 1 +self.state.update(state_value) +elif value[0] == 'c': +state_value = self._get_state_value() +assert state_value == 1 +self.state.update(state_value) +elif value[0] == 'd': +state_value = self._get_state_value() +assert state_value == 2 +self.state.update(state_value) +else: +pass +return value + +def _get_state_value(self): state_value = self.state.value() if state_value is None: state_value = 1 else: state_value += 1 -if value[0] == 'b': -assert self.pre == 'a' -assert state_value == 2 -if value[0] == 'd': -assert self.pre == 'c' -assert state_value == 2 -if value[0] == 'e': -assert state_value == 1 -self.pre = value[0] -self.state.update(state_value) -return value +return state_value keyed_stream.map(AssertKeyMapFunction()).add_sink(self.test_sink) self.env.execute('key_by_test') results = self.test_sink.get_results(True) -expected = ["Row(f0='e', f1=2)", "Row(f0='a', f1=0)", "Row(f0='b', f1=0)", -"Row(f0='c', f1=1)", "Row(f0='d', f1=1)"] +expected = ["Row(f0='e', f1=2)", "Row(f0='a', f1=0)", "Row(f0='b', f1=1)", +"Row(f0='c', f1=0)", "Row(f0='d', f1=1)"] results.sort() expected.sort() self.assertEqual(expected, results) diff --git a/flink-python/pyflink/fn_execution/state_impl.py b/flink-python/pyflink/fn_execution/state_impl.py index 590c1d7..e221e70 100644 --- a/flink-python/pyflink/fn_execution/state_impl.py +++ b/flink-python/pyflink/fn_execution/state_impl.py @@ -1060,14 +1060,14 @@ class RemoteKeyedStateBackend(object): if key == self._current_key: return encoded_old_key = self._encoded_current_key -self._current_key = key -self._encoded_current_key = self._key_coder_impl.encode(self._current_key) for state_name, state_obj in self._all_states.items(): if self._state_cache_size > 0:
[flink] branch release-1.13 updated (2c455f3 -> c1f30ee)
This is an automated email from the ASF dual-hosted git repository. hxb pushed a change to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git. from 2c455f3 [FLINK-22766][connector/kafka] Report offsets and Kafka consumer metrics in Flink metric group add c1f30ee [FLINK-23368][python] Fix the wrong mapping of state cache in PyFlink No new revisions were added by this update. Summary of changes: .../pyflink/datastream/tests/test_data_stream.py | 44 ++ flink-python/pyflink/fn_execution/state_impl.py| 4 +- 2 files changed, 31 insertions(+), 17 deletions(-)
[flink] branch master updated (08e9343 -> c874338)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 08e9343 [FLINK-23368][python] Fix the wrong mapping of state cache in PyFlink add c874338 [FLINK-23233][runtime] Ensure checkpoints confirmed after all the failed events processed for OepratorCoordinator No new revisions were added by this update. Summary of changes: .../coordination/OperatorCoordinatorHolder.java| 27 +-- .../operators/coordination/SubtaskGatewayImpl.java | 31 --- .../org/apache/flink/runtime/util/Runnables.java | 12 +++ .../coordination/EventReceivingTasks.java | 7 +- .../OperatorCoordinatorHolderTest.java | 94 ++ 5 files changed, 135 insertions(+), 36 deletions(-)
[flink] branch release-1.13 updated (c1f30ee -> 1fa52e1)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git. from c1f30ee [FLINK-23368][python] Fix the wrong mapping of state cache in PyFlink add 1fa52e1 [FLINK-23233][runtime] Ensure checkpoints confirmed after all the failed events processed for OepratorCoordinator No new revisions were added by this update. Summary of changes: .../coordination/OperatorCoordinatorHolder.java| 27 +-- .../operators/coordination/SubtaskGatewayImpl.java | 31 --- .../org/apache/flink/runtime/util/Runnables.java | 10 +++ .../coordination/EventReceivingTasks.java | 7 +- .../OperatorCoordinatorHolderTest.java | 94 ++ 5 files changed, 133 insertions(+), 36 deletions(-)
[flink] branch release-1.12 updated (6292777 -> 2c3f8f6)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git. from 6292777 [FLINK-23223] Notifies if there are available data on resumption for pipelined subpartition add 2c3f8f6 [FLINK-23233][runtime] Ensure checkpoints confirmed after all the failed events processed for OepratorCoordinator No new revisions were added by this update. Summary of changes: .../coordination/OperatorCoordinatorHolder.java| 27 +-- .../operators/coordination/SubtaskGatewayImpl.java | 31 --- .../org/apache/flink/runtime/util/Runnables.java | 10 +++ .../coordination/EventReceivingTasks.java | 7 +- .../OperatorCoordinatorHolderTest.java | 94 ++ 5 files changed, 133 insertions(+), 36 deletions(-)
[flink] 01/09: [FLINK-21368] Remove RpcService#getExecutor
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5d3ca598ab37d3dfec5b01e19080874e541212af Author: Chesnay Schepler AuthorDate: Thu Jul 8 10:51:54 2021 +0200 [FLINK-21368] Remove RpcService#getExecutor --- .../apache/flink/runtime/rpc/akka/AkkaRpcService.java| 16 .../apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java | 2 +- .../java/org/apache/flink/runtime/rpc/RpcService.java| 16 +--- .../flink/runtime/registration/RetryingRegistration.java | 8 .../runtime/taskexecutor/DefaultJobLeaderService.java| 2 +- .../apache/flink/runtime/taskexecutor/TaskExecutor.java | 6 +++--- .../registration/RegisteredRpcConnectionTest.java| 10 +- .../runtime/registration/RetryingRegistrationTest.java | 8 +--- .../apache/flink/runtime/rpc/FencedRpcEndpointTest.java | 2 +- .../taskexecutor/TaskSubmissionTestEnvironment.java | 4 +++- .../OperatorEventSendingCheckpointITCase.java| 6 -- 11 files changed, 28 insertions(+), 52 deletions(-) diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index bba81a7..4c1a788 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -40,6 +40,7 @@ import org.apache.flink.util.TimeUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ScheduledExecutor; +import org.apache.flink.util.function.FunctionUtils; import akka.actor.AbstractActor; import akka.actor.ActorRef; @@ -48,7 +49,6 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.DeadLetter; import akka.actor.Props; -import akka.dispatch.Futures; import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +69,6 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; @@ -77,7 +76,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import scala.Option; -import scala.concurrent.Future; import scala.reflect.ClassTag$; import static org.apache.flink.util.Preconditions.checkArgument; @@ -468,11 +466,6 @@ public class AkkaRpcService implements RpcService { } @Override -public Executor getExecutor() { -return actorSystem.dispatcher(); -} - -@Override public ScheduledExecutor getScheduledExecutor() { return internalScheduledExecutor; } @@ -488,14 +481,13 @@ public class AkkaRpcService implements RpcService { @Override public void execute(Runnable runnable) { -actorSystem.dispatcher().execute(runnable); +getScheduledExecutor().execute(runnable); } @Override public CompletableFuture execute(Callable callable) { -Future scalaFuture = Futures.future(callable, actorSystem.dispatcher()); - -return AkkaFutureUtils.toJava(scalaFuture); +return CompletableFuture.supplyAsync( +FunctionUtils.uncheckedSupplier(callable::call), getScheduledExecutor()); } // --- diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 01437e4..b522d44 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -192,7 +192,7 @@ public class AkkaRpcActorTest extends TestLogger { assertFalse(terminationFuture.isDone()); -CompletableFuture.runAsync(rpcEndpoint::closeAsync, akkaRpcService.getExecutor()); +CompletableFuture.runAsync(rpcEndpoint::closeAsync, akkaRpcService.getScheduledExecutor()); // wait until the rpc endpoint has terminated terminationFuture.get(); diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index eb36deb..d8c2d66 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++
[flink] 03/09: [hotfix] Remove akka dependency in MetricUtilsTest
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ae85de7fa7f06b3d3751405d1fe0b57b448ca7f4 Author: Chesnay Schepler AuthorDate: Thu Jul 1 10:15:54 2021 +0200 [hotfix] Remove akka dependency in MetricUtilsTest --- .../apache/flink/runtime/metrics/util/MetricUtilsTest.java | 12 +--- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java index 8f387e8..e3cc553 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcSystem; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.taskexecutor.TaskManagerServices; import org.apache.flink.runtime.taskexecutor.TaskManagerServicesBuilder; import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable; @@ -40,7 +39,6 @@ import org.apache.flink.util.function.CheckedSupplier; import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; -import akka.actor.ActorSystem; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -54,7 +52,6 @@ import java.util.List; import static org.apache.flink.runtime.metrics.util.MetricUtils.METRIC_GROUP_FLINK; import static org.apache.flink.runtime.metrics.util.MetricUtils.METRIC_GROUP_MANAGED_MEMORY; import static org.apache.flink.runtime.metrics.util.MetricUtils.METRIC_GROUP_MEMORY; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; @@ -83,17 +80,10 @@ public class MetricUtilsTest extends TestLogger { final RpcService rpcService = MetricUtils.startRemoteMetricsRpcService( configuration, "localhost", RpcSystem.load()); -assertThat(rpcService, instanceOf(AkkaRpcService.class)); - -final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem(); try { final int threadPriority = -actorSystem -.settings() -.config() - .getInt("akka.actor.default-dispatcher.thread-priority"); - +rpcService.execute(() -> Thread.currentThread().getPriority()).get(); assertThat(threadPriority, is(expectedThreadPriority)); } finally { rpcService.stopService().get();
[flink] 05/09: [hotfix] Extract logging parent-first patterns into constant
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 9cf4c48945f10db074854828d92ca70280ef23ab Author: Chesnay Schepler AuthorDate: Mon Jul 5 10:56:34 2021 +0200 [hotfix] Extract logging parent-first patterns into constant --- docs/layouts/shortcodes/generated/core_configuration.html| 2 +- .../shortcodes/generated/expert_class_loading_section.html | 2 +- .../java/org/apache/flink/configuration/CoreOptions.java | 12 +--- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index bf00ecd..43ec684 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -30,7 +30,7 @@ This check should only be disabled if such a leak prevents further jobs from run classloader.parent-first-patterns.default -"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback;org.xml;javax.xml;org.apache.xerces;org.w3c" +"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.xml;javax.xml;org.apache.xerces;org.w3c;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback" String A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. To add another pattern we recommend to use "classloader.parent-first-patterns.additional" instead. diff --git a/docs/layouts/shortcodes/generated/expert_class_loading_section.html b/docs/layouts/shortcodes/generated/expert_class_loading_section.html index a4236f1..c88ac73 100644 --- a/docs/layouts/shortcodes/generated/expert_class_loading_section.html +++ b/docs/layouts/shortcodes/generated/expert_class_loading_section.html @@ -30,7 +30,7 @@ This check should only be disabled if such a leak prevents further jobs from run classloader.parent-first-patterns.default -"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback;org.xml;javax.xml;org.apache.xerces;org.w3c" +"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.xml;javax.xml;org.apache.xerces;org.w3c;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback" String A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. To add another pattern we recommend to use "classloader.parent-first-patterns.additional" instead. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index 7fde49a..db21d21 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -18,6 +18,7 @@ package org.apache.flink.configuration; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.docs.ConfigGroup; import org.apache.flink.annotation.docs.ConfigGroups; @@ -34,6 +35,10 @@ import static org.apache.flink.configuration.ConfigOptions.key; @ConfigGroups(groups = {@ConfigGroup(name = "Environment", keyPrefix = "env")}) public class CoreOptions { +@Internal +public static final String PARENT_FIRST_LOGGING_PATTERNS = + "org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback"; + // // Classloading Parameters // @@ -92,7 +97,8 @@ public class CoreOptions { public static final ConfigOption ALWAYS_PARENT_FIRST_LOADER_PATTERNS = ConfigOptions.key("classloader.parent-first-patterns.default") .defaultValue( - "java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apach
[flink] branch master updated (c874338 -> e341e47)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c874338 [FLINK-23233][runtime] Ensure checkpoints confirmed after all the failed events processed for OepratorCoordinator new 5d3ca59 [FLINK-21368] Remove RpcService#getExecutor new e2d36ba [hotfix] Replace AskTimeException usages in tests new ae85de7 [hotfix] Remove akka dependency in MetricUtilsTest new b8ee568 [hotfix] Allow @Internal annotation on fields new 9cf4c48 [hotfix] Extract logging parent-first patterns into constant new 548d70a [FLINK-18783] RpcSystem extends AutoCloseable new ad2f2bc [FLINK-18783] RpcSystem#load accepts Configuration new 1aa7d16 [FLINK-18783] Add ComponentClassLoader new e341e47 [FLINK-18783] Load Akka with separate classloader The 9 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../resource-providers/standalone/kubernetes.md| 4 +- .../resource-providers/standalone/kubernetes.md| 4 +- .../shortcodes/generated/core_configuration.html | 2 +- .../generated/expert_class_loading_section.html| 2 +- .../java/org/apache/flink/annotation/Internal.java | 2 +- flink-clients/pom.xml | 6 +- flink-connectors/flink-connector-base/pom.xml | 2 +- flink-connectors/flink-connector-cassandra/pom.xml | 2 +- .../flink-connector-elasticsearch-base/pom.xml | 2 +- .../flink-connector-elasticsearch5/pom.xml | 2 +- .../flink-connector-elasticsearch6/pom.xml | 2 +- .../flink-connector-elasticsearch7/pom.xml | 2 +- flink-connectors/flink-connector-files/pom.xml | 2 +- .../flink-connector-gcp-pubsub/pom.xml | 2 +- flink-connectors/flink-connector-hbase-1.4/pom.xml | 2 +- flink-connectors/flink-connector-hive/pom.xml | 2 +- flink-connectors/flink-connector-jdbc/pom.xml | 2 +- flink-connectors/flink-connector-kafka/pom.xml | 6 +- flink-connectors/flink-connector-kinesis/pom.xml | 4 +- flink-connectors/flink-connector-rabbitmq/pom.xml | 4 +- flink-container/pom.xml| 2 +- .../apache/flink/configuration/CoreOptions.java| 15 +- .../core/classloading/ComponentClassLoader.java| 266 ++ .../core/classloading/SubmoduleClassLoader.java| 45 +++ .../org/apache/flink/core/plugin/PluginLoader.java | 96 +- .../apache/flink/util/concurrent/FutureUtils.java | 26 +- .../classloading/ComponentClassLoaderTest.java | 268 +++ flink-dist/pom.xml | 17 +- flink-dist/src/main/assemblies/opt.xml | 4 +- flink-dist/src/main/assemblies/plugins.xml | 6 +- .../src/main/flink-bin/conf/log4j-cli.properties | 2 +- .../main/flink-bin/conf/log4j-console.properties | 2 +- .../main/flink-bin/conf/log4j-session.properties | 2 +- .../src/main/flink-bin/conf/log4j.properties | 2 +- .../src/main/flink-bin/conf/logback-console.xml| 2 +- flink-dist/src/main/flink-bin/conf/logback.xml | 2 +- flink-dist/src/main/resources/META-INF/NOTICE | 12 - flink-docs/pom.xml | 8 +- .../pom.xml| 2 +- .../flink-end-to-end-tests-common/pom.xml | 2 +- .../flink-metrics-availability-test/pom.xml| 2 +- .../flink-metrics-reporter-prometheus-test/pom.xml | 2 +- flink-end-to-end-tests/test-scripts/common.sh | 4 +- flink-examples/flink-examples-streaming/pom.xml| 2 +- flink-examples/flink-examples-table/pom.xml| 2 +- flink-formats/flink-avro/pom.xml | 2 +- flink-formats/flink-compress/pom.xml | 2 +- flink-formats/flink-csv/pom.xml| 2 +- flink-formats/flink-hadoop-bulk/pom.xml| 2 +- flink-formats/flink-json/pom.xml | 2 +- flink-formats/flink-orc/pom.xml| 2 +- flink-formats/flink-parquet/pom.xml| 2 +- flink-formats/flink-sequence-file/pom.xml | 2 +- flink-fs-tests/pom.xml | 4 +- flink-kubernetes/pom.xml | 6 +- flink-libraries/flink-cep/pom.xml | 4 +- flink-libraries/flink-gelly/pom.xml| 2 +- flink-libraries/flink-state-processing-api/pom.xml | 4 +- flink-metrics/flink-metrics-dropwizard/pom.xml | 4 +- flink-metrics/flink-metrics-influxdb/pom.xml | 6 +- flink-metrics/flink-metrics-jmx/pom.xml| 8 +- flink-metrics/flink-metrics-prometheus/pom.xml | 6 +- flink-metrics/flink-metrics-
[flink] 06/09: [FLINK-18783] RpcSystem extends AutoCloseable
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 548d70aa50a43b9a8c7c901bdefbb75b85959904 Author: Chesnay Schepler AuthorDate: Mon Jul 12 09:44:05 2021 +0200 [FLINK-18783] RpcSystem extends AutoCloseable --- .../src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java | 6 +- .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java| 9 +++-- .../org/apache/flink/runtime/minicluster/MiniCluster.java | 11 ++- .../apache/flink/runtime/taskexecutor/TaskManagerRunner.java | 9 +++-- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java index 3bc19bd..adc31c6 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java @@ -27,7 +27,7 @@ import java.util.ServiceLoader; * This interface serves as a factory interface for RPC services, with some additional utilities * that are reliant on implementation details of the RPC service. */ -public interface RpcSystem extends RpcSystemUtils { +public interface RpcSystem extends RpcSystemUtils, AutoCloseable { /** * Returns a builder for an {@link RpcService} that is only reachable from the local machine. @@ -51,6 +51,10 @@ public interface RpcSystem extends RpcSystemUtils { @Nullable String externalAddress, String externalPortRange); +/** Hook to cleanup resources, like common thread pools or classloaders. */ +@Override +default void close() {} + /** Builder for {@link RpcService}. */ interface RpcServiceBuilder { RpcServiceBuilder withComponentName(String name); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index c0d3063..8b24dfa 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -150,6 +150,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro private ExecutionGraphInfoStore executionGraphInfoStore; private final Thread shutDownHook; +private RpcSystem rpcSystem; protected ClusterEntrypoint(Configuration configuration) { this.configuration = generateClusterConfiguration(configuration); @@ -293,7 +294,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro LOG.info("Initializing cluster services."); synchronized (lock) { -final RpcSystem rpcSystem = RpcSystem.load(); +rpcSystem = RpcSystem.load(); commonRpcService = RpcUtils.createRemoteRpcService( @@ -499,8 +500,12 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro FutureUtils.composeAfterwards( shutDownApplicationFuture, () -> stopClusterServices(cleanupHaData)); +final CompletableFuture rpcSystemClassLoaderCloseFuture = +FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close); + final CompletableFuture cleanupDirectoriesFuture = -FutureUtils.runAfterwards(serviceShutdownFuture, this::cleanupDirectories); +FutureUtils.runAfterwards( +rpcSystemClassLoaderCloseFuture, this::cleanupDirectories); cleanupDirectoriesFuture.whenComplete( (Void ignored2, Throwable serviceThrowable) -> { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 3f884c8..c9f24d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -197,6 +197,9 @@ public class MiniCluster implements AutoCloseableAsync { /** Flag marking the mini cluster as started/running. */ private volatile boolean running; +@GuardedBy("lock") +private RpcSystem rpcSystem; + // /** @@ -273,7 +276,7 @@ public class MiniCluster implements AutoCloseableAsync { try { initializeIOFormatClasses(configuration); -final RpcSystem rpcSystem = RpcSystem.load(); +rpcSystem = RpcSystem.load();
[flink] 04/09: [hotfix] Allow @Internal annotation on fields
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b8ee5687bcf218d7f36bce082b06344eb83d9a01 Author: Chesnay Schepler AuthorDate: Mon Jul 5 10:53:41 2021 +0200 [hotfix] Allow @Internal annotation on fields --- .../src/main/java/org/apache/flink/annotation/Internal.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java b/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java index 8c2baf9..6531c84 100644 --- a/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java +++ b/flink-annotations/src/main/java/org/apache/flink/annotation/Internal.java @@ -29,6 +29,6 @@ import java.lang.annotation.Target; * Developer APIs are stable but internal to Flink and might change across releases. */ @Documented -@Target({ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR}) +@Target({ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.FIELD}) @Public public @interface Internal {}
[flink] 02/09: [hotfix] Replace AskTimeException usages in tests
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e2d36ba8f0e8b852f01ba6cef8924a6213a82f29 Author: Chesnay Schepler AuthorDate: Thu Jul 1 10:16:09 2021 +0200 [hotfix] Replace AskTimeException usages in tests --- .../resourcemanager/slotmanager/DeclarativeSlotManagerTest.java | 3 +-- .../resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java| 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java index d325bbf..5b826db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java @@ -56,7 +56,6 @@ import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; -import akka.pattern.AskTimeoutException; import org.junit.Test; import java.util.ArrayList; @@ -602,7 +601,7 @@ public class DeclarativeSlotManagerTest extends TestLogger { final BlockingQueue>> responseQueue = new ArrayBlockingQueue<>(2); responseQueue.add( -() -> FutureUtils.completedExceptionally(new AskTimeoutException("timeout"))); +() -> FutureUtils.completedExceptionally(new TimeoutException("timeout"))); responseQueue.add( () -> { secondSlotRequestFuture.complete(null); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java index 62ecfe3..d00bbd2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.java @@ -36,11 +36,11 @@ import org.apache.flink.runtime.testutils.TestingUtils; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.FutureUtils; -import akka.pattern.AskTimeoutException; import org.junit.Test; import java.util.Arrays; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; @@ -126,7 +126,7 @@ public class DefaultSlotStatusSyncerTest extends TestLogger { .setRequestSlotFunction( ignored -> FutureUtils.completedExceptionally( -new AskTimeoutException("timeout"))) +new TimeoutException("timeout"))) .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway); @@ -151,7 +151,7 @@ public class DefaultSlotStatusSyncerTest extends TestLogger { try { allocatedFuture.get(); } catch (Exception e) { -assertThat(e.getCause(), instanceOf(AskTimeoutException.class)); +assertThat(e.getCause(), instanceOf(TimeoutException.class)); } assertThat(resourceTracker.getAcquiredResources(jobId), is(empty())); assertThat(
[flink] 07/09: [FLINK-18783] RpcSystem#load accepts Configuration
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ad2f2bccf5f933a85cae9417e1fe0559075ece84 Author: Chesnay Schepler AuthorDate: Thu Jul 1 13:59:01 2021 +0200 [FLINK-18783] RpcSystem#load accepts Configuration --- .../src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java | 10 ++ .../org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 2 +- .../java/org/apache/flink/runtime/minicluster/MiniCluster.java | 2 +- .../apache/flink/runtime/taskexecutor/TaskManagerRunner.java | 2 +- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java index adc31c6..5934c60 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java @@ -78,6 +78,16 @@ public interface RpcSystem extends RpcSystemUtils, AutoCloseable { * @return loaded RpcSystem */ static RpcSystem load() { +return load(new Configuration()); +} + +/** + * Loads the RpcSystem. + * + * @param config Flink configuration + * @return loaded RpcSystem + */ +static RpcSystem load(Configuration config) { final ClassLoader classLoader = RpcSystem.class.getClassLoader(); return ServiceLoader.load(RpcSystem.class, classLoader).iterator().next(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 8b24dfa..07b0a71 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -294,7 +294,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro LOG.info("Initializing cluster services."); synchronized (lock) { -rpcSystem = RpcSystem.load(); +rpcSystem = RpcSystem.load(configuration); commonRpcService = RpcUtils.createRemoteRpcService( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index c9f24d8..d083783 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -276,7 +276,7 @@ public class MiniCluster implements AutoCloseableAsync { try { initializeIOFormatClasses(configuration); -rpcSystem = RpcSystem.load(); +rpcSystem = RpcSystem.load(configuration); LOG.info("Starting Metrics Registry"); metricRegistry = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index aaf7aa1..48eec9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -139,7 +139,7 @@ public class TaskManagerRunner implements FatalErrorHandler { throws Exception { this.configuration = checkNotNull(configuration); -rpcSystem = RpcSystem.load(); +rpcSystem = RpcSystem.load(configuration); timeout = Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
[flink] 08/09: [FLINK-18783] Add ComponentClassLoader
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1aa7d16c1188dd70dd7cbf1a7e4acf0e0db8cea1 Author: Chesnay Schepler AuthorDate: Tue Jul 6 16:59:30 2021 +0200 [FLINK-18783] Add ComponentClassLoader The new ComponentClassLoader is intended as a CL that can eventually support all of our class-loading needs, including plugins, rpc systems and user-code. --- .../core/classloading/ComponentClassLoader.java| 266 .../org/apache/flink/core/plugin/PluginLoader.java | 96 +--- .../classloading/ComponentClassLoaderTest.java | 268 + 3 files changed, 537 insertions(+), 93 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java b/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java new file mode 100644 index 000..39636a6 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.classloading; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.function.FunctionWithException; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.Iterator; + +/** + * A {@link URLClassLoader} that restricts which classes can be loaded to those contained within the + * given classpath, except classes from a given set of packages that are either loaded owner or + * component-first. + * + * Depiction of the class loader hierarchy: + * + * + * Owner Bootstrap + * ^ ^ + * |-| + *| + *Component + * + * + * For loading classes/resources, class loaders are accessed in one of the following orders: + * + * + * component-only: component -> bootstrap; default. + * component-first: component -> bootstrap -> owner; opt-in. + * owner-first: owner -> component -> bootstrap; opt-in. + * + */ +public class ComponentClassLoader extends URLClassLoader { +private static final ClassLoader PLATFORM_OR_BOOTSTRAP_LOADER; + +private final ClassLoader ownerClassLoader; + +private final String[] ownerFirstPackages; +private final String[] componentFirstPackages; +private final String[] ownerFirstResourcePrefixes; +private final String[] componentFirstResourcePrefixes; + +public ComponentClassLoader( +URL[] classpath, +ClassLoader ownerClassLoader, +String[] ownerFirstPackages, +String[] componentFirstPackages) { +super(classpath, PLATFORM_OR_BOOTSTRAP_LOADER); +this.ownerClassLoader = ownerClassLoader; + +this.ownerFirstPackages = ownerFirstPackages; +this.componentFirstPackages = componentFirstPackages; + +ownerFirstResourcePrefixes = convertPackagePrefixesToPathPrefixes(ownerFirstPackages); +componentFirstResourcePrefixes = +convertPackagePrefixesToPathPrefixes(componentFirstPackages); +} + +// -- +// Class loading +// -- + +@Override +protected Class loadClass(final String name, final boolean resolve) +throws ClassNotFoundException { +synchronized (getClassLoadingLock(name)) { +final Class loadedClass = findLoadedClass(name); +if (loadedClass != null) { +return resolveIfNeeded(resolve, loadedClass); +} + +if (isComponentFirstClass(name)) { +return loadClassFromComponentFirst(name, resolve); +} +if (isOwnerFirstClass(name)) { +return loadClassFromOwnerFirst(name, resolve); +} + +
[flink-training] branch master updated (dd7be31 -> 5c8386d)
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git. from dd7be31 [FLINK-22868] Update to use Flink 1.13.1 new 82620e0 [FLINK-23332][gradle] update Gradle version new 1504a64 [hotfix][gradle] update build-scan plugin version new 5c8386d [hotfix][gradle] set project properties in a central place The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: build.gradle | 14 -- gradle/wrapper/gradle-wrapper.properties | 2 +- settings.gradle | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-)
[flink-training] 03/03: [hotfix][gradle] set project properties in a central place
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git commit 5c8386d022719f72fbb161c6727c0c2e2a3c12f1 Author: Nico Kruber AuthorDate: Fri Jul 2 11:36:27 2021 +0200 [hotfix][gradle] set project properties in a central place Also, we actually only have the root project's description, so let's not set it for all subprojects! --- build.gradle | 12 +++- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/build.gradle b/build.gradle index eb42442..7697737 100644 --- a/build.gradle +++ b/build.gradle @@ -19,6 +19,13 @@ plugins { id 'com.github.johnrengelman.shadow' version '7.0.0' apply false } +description = "Flink Training Exercises" + +allprojects { +group = 'org.apache.flink' +version = '1.13-SNAPSHOT' +} + subprojects { apply plugin: 'java' apply plugin: 'scala' // optional; uncomment if needed @@ -29,11 +36,6 @@ subprojects { apply plugin: 'checkstyle' apply plugin: 'eclipse' -// artifact properties -group = 'org.apache.flink' -version = '1.13-SNAPSHOT' -description = """Flink Training Exercises""" - ext { javaVersion = '1.8' flinkVersion = '1.13.1'
[flink-training] 02/03: [hotfix][gradle] update build-scan plugin version
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git commit 1504a6477abc465775a39d9073b184e36659225d Author: Nico Kruber AuthorDate: Fri Jul 2 11:17:24 2021 +0200 [hotfix][gradle] update build-scan plugin version --- settings.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/settings.gradle b/settings.gradle index 24ea327..5e98871 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,5 +1,5 @@ plugins { -id "com.gradle.enterprise" version "3.2.1" +id "com.gradle.enterprise" version "3.6.3" } rootProject.name = 'flink-training'
[flink-training] 01/03: [FLINK-23332][gradle] update Gradle version
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git commit 82620e098fa39c71538144b7f6a115163b071ccf Author: Nico Kruber AuthorDate: Fri Jul 2 11:09:02 2021 +0200 [FLINK-23332][gradle] update Gradle version This also requires a newer 'shadow' version. --- build.gradle | 2 +- gradle/wrapper/gradle-wrapper.properties | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index b516b5d..eb42442 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ */ plugins { -id 'com.github.johnrengelman.shadow' version '5.2.0' apply false +id 'com.github.johnrengelman.shadow' version '7.0.0' apply false } subprojects { diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 6254d2d..69a9715 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.2.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists
[flink-training] branch master updated: [FLINK-23334][gradle] let the subprojects decide whether they implement an application
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git The following commit(s) were added to refs/heads/master by this push: new 63d360e [FLINK-23334][gradle] let the subprojects decide whether they implement an application 63d360e is described below commit 63d360e49368041a4bbdd320a5c4b3600961dab2 Author: Nico Kruber AuthorDate: Fri Jul 2 11:37:19 2021 +0200 [FLINK-23334][gradle] let the subprojects decide whether they implement an application This gives a cleaner gradle project file layout. --- build.gradle | 7 --- hourly-tips/build.gradle | 2 ++ long-ride-alerts/build.gradle | 2 ++ ride-cleansing/build.gradle | 2 ++ rides-and-fares/build.gradle | 2 ++ 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index 7697737..5d0c8e9 100644 --- a/build.gradle +++ b/build.gradle @@ -29,9 +29,6 @@ allprojects { subprojects { apply plugin: 'java' apply plugin: 'scala' // optional; uncomment if needed -if (project != project(":common")) { -apply plugin: 'application' -} apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'checkstyle' apply plugin: 'eclipse' @@ -120,10 +117,6 @@ subprojects { } } -if (plugins.findPlugin('application')) { -applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"] -run.classpath = sourceSets.main.runtimeClasspath -} jar { manifest { diff --git a/hourly-tips/build.gradle b/hourly-tips/build.gradle index 11df511..75e3e60 100644 --- a/hourly-tips/build.gradle +++ b/hourly-tips/build.gradle @@ -1 +1,3 @@ +apply plugin: 'application' + mainClassName = 'org.apache.flink.training.exercises.hourlytips.HourlyTipsExercise' diff --git a/long-ride-alerts/build.gradle b/long-ride-alerts/build.gradle index 62e4ffd..59b9448 100644 --- a/long-ride-alerts/build.gradle +++ b/long-ride-alerts/build.gradle @@ -1 +1,3 @@ +apply plugin: 'application' + mainClassName = 'org.apache.flink.training.exercises.longrides.LongRidesExercise' diff --git a/ride-cleansing/build.gradle b/ride-cleansing/build.gradle index 106e3b9..1f03917 100644 --- a/ride-cleansing/build.gradle +++ b/ride-cleansing/build.gradle @@ -1 +1,3 @@ +apply plugin: 'application' + mainClassName = 'org.apache.flink.training.exercises.ridecleansing.RideCleansingExercise' diff --git a/rides-and-fares/build.gradle b/rides-and-fares/build.gradle index 18366af..565aec6 100644 --- a/rides-and-fares/build.gradle +++ b/rides-and-fares/build.gradle @@ -1 +1,3 @@ +apply plugin: 'application' + mainClassName = 'org.apache.flink.training.exercises.ridesandfares.RidesAndFaresExercise'
[flink-training] branch master updated: [FLINK-23336][log4j] update Log4J to 2.12.1 as used by Flink 1.13
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git The following commit(s) were added to refs/heads/master by this push: new 7c323f4 [FLINK-23336][log4j] update Log4J to 2.12.1 as used by Flink 1.13 7c323f4 is described below commit 7c323f4bb0a4659a57498c099dbe900ba023d43d Author: Nico Kruber AuthorDate: Fri Jul 2 11:42:41 2021 +0200 [FLINK-23336][log4j] update Log4J to 2.12.1 as used by Flink 1.13 --- build.gradle| 9 + .../src/main/resources/log4j2.properties| 13 +++-- .../src/main/resources/log4j2.properties| 13 +++-- .../src/main/resources/log4j2.properties| 13 +++-- .../src/main/resources/log4j2.properties| 13 +++-- .../checkpointing/src/main/resources/log4j2.properties | 13 +++-- .../src/main/resources/log4j2.properties| 13 +++-- .../introduction/src/main/resources/log4j2.properties | 13 +++-- .../object-reuse/src/main/resources/log4j2.properties | 13 +++-- .../exercise/src/main/resources/log4j2.properties | 13 +++-- .../src/main/resources/log4j2.properties| 13 +++-- .../solution/src/main/resources/log4j2.properties | 13 +++-- .../throughput/src/main/resources/log4j2.properties | 13 +++-- 13 files changed, 89 insertions(+), 76 deletions(-) diff --git a/build.gradle b/build.gradle index 5d0c8e9..6077960 100644 --- a/build.gradle +++ b/build.gradle @@ -37,8 +37,7 @@ subprojects { javaVersion = '1.8' flinkVersion = '1.13.1' scalaBinaryVersion = '2.12' -slf4jVersion = '1.7.15' -log4jVersion = '1.2.17' +log4jVersion = '2.12.1' junitVersion = '4.12' } @@ -73,6 +72,7 @@ subprojects { flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305' flinkShadowJar.exclude group: 'org.slf4j' flinkShadowJar.exclude group: 'log4j' +flinkShadowJar.exclude group: 'org.apache.logging.log4j', module: 'log4j-to-slf4j' // already provided dependencies from serializer frameworks flinkShadowJar.exclude group: 'com.esotericsoftware.kryo', module: 'kryo' @@ -82,8 +82,9 @@ subprojects { // common set of dependencies dependencies { -implementation "log4j:log4j:${log4jVersion}" -implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}" +implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}" +implementation "org.apache.logging.log4j:log4j-api:${log4jVersion}" +implementation "org.apache.logging.log4j:log4j-core:${log4jVersion}" if (project != project(":common")) { implementation project(path: ':common') diff --git a/rides-and-fares/src/main/resources/log4j.properties b/hourly-tips/src/main/resources/log4j2.properties similarity index 78% copy from rides-and-fares/src/main/resources/log4j.properties copy to hourly-tips/src/main/resources/log4j2.properties index da32ea0..8319d24 100644 --- a/rides-and-fares/src/main/resources/log4j.properties +++ b/hourly-tips/src/main/resources/log4j2.properties @@ -15,9 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. - -log4j.rootLogger=INFO, console - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n +loogers=rootLooger +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n +rootLogger.level=INFO +rootLogger.appenderRef.console.ref=STDOUT \ No newline at end of file diff --git a/hourly-tips/src/main/resources/log4j.properties b/long-ride-alerts/src/main/resources/log4j2.properties similarity index 78% rename from hourly-tips/src/main/resources/log4j.properties rename to long-ride-alerts/src/main/resources/log4j2.properties index da32ea0..8319d24 100644 --- a/hourly-tips/src/main/resources/log4j.properties +++ b/long-ride-alerts/src/main/resources/log4j2.properties @@ -15,9 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. - -log4j.rootLogger=INFO, console - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.Conversion
[flink-training] branch master updated: [FLINK-23337][gradle] Properly use the 'shadow' plugin
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git The following commit(s) were added to refs/heads/master by this push: new 453d65a [FLINK-23337][gradle] Properly use the 'shadow' plugin 453d65a is described below commit 453d65a3e0bfe244427a6e025a1798c677073289 Author: Nico Kruber AuthorDate: Fri Jul 2 17:10:03 2021 +0200 [FLINK-23337][gradle] Properly use the 'shadow' plugin This removes the need for the custom `flinkShadowJar` configuration and instead defines dependencies with the default ways that the 'shadow' plugin offers. --- build.gradle| 69 ++--- common/build.gradle | 8 ++- 2 files changed, 30 insertions(+), 47 deletions(-) diff --git a/build.gradle b/build.gradle index 6077960..b13aaa9 100644 --- a/build.gradle +++ b/build.gradle @@ -60,62 +60,38 @@ subprojects { } } -// NOTE: We cannot use "compileOnly" or "shadow" configurations since then we could not run code -// in the IDE or with "gradle run". We also cannot exclude transitive dependencies from the -// shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159). -// -> Explicitly define the // libraries we want to be included in the "flinkShadowJar" configuration! -configurations { -flinkShadowJar // dependencies which go into the shadowJar - -// provided by Flink -flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading' -flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305' -flinkShadowJar.exclude group: 'org.slf4j' -flinkShadowJar.exclude group: 'log4j' -flinkShadowJar.exclude group: 'org.apache.logging.log4j', module: 'log4j-to-slf4j' - -// already provided dependencies from serializer frameworks -flinkShadowJar.exclude group: 'com.esotericsoftware.kryo', module: 'kryo' -flinkShadowJar.exclude group: 'javax.servlet', module: 'servlet-api' -flinkShadowJar.exclude group: 'org.apache.httpcomponents', module: 'httpclient' -} - // common set of dependencies dependencies { -implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}" -implementation "org.apache.logging.log4j:log4j-api:${log4jVersion}" -implementation "org.apache.logging.log4j:log4j-core:${log4jVersion}" +shadow "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}" +shadow "org.apache.logging.log4j:log4j-api:${log4jVersion}" +shadow "org.apache.logging.log4j:log4j-core:${log4jVersion}" + +shadow "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}" +shadow "org.apache.flink:flink-java:${flinkVersion}" +shadow "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" +shadow "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}" if (project != project(":common")) { implementation project(path: ':common') -// transitive dependencies for flinkShadowJar need to be defined above -// (the alternative of using configuration: 'shadow' does not work there because that adds a dependency on -// the jar file, not the sources) -flinkShadowJar project(path: ':common', transitive: false) - testImplementation(project(":common")) { capabilities { requireCapability("$group:common-test") } } } } -// make flinkShadowJar dependencies available: +// add solution source dirs: sourceSets { main.java.srcDirs += 'src/solution/java' main.scala.srcDirs += 'src/solution/scala' -main.compileClasspath += configurations.flinkShadowJar -main.runtimeClasspath += configurations.flinkShadowJar -test.compileClasspath += configurations.flinkShadowJar -test.runtimeClasspath += configurations.flinkShadowJar +// Add shadow configuration to runtime class path so that the +// dynamically-generated tasks by IntelliJ are able to run and have +// all dependencies they need. (Luckily, this does not influence what +// ends up in the final shadowJar.) +main.runtimeClasspath += configurations.shadow -javadoc.classpath += configurations.flinkShadowJar -} - -eclipse { -classpath { -plusConfigurations += [configurations.flinkShadowJar] -} +test.compileClasspath += configurations.shadow +test.runtimeClasspath += configurations.shadow } @@ -127,7 +103,18 @@ subprojects { } shadowJar { -configurations = [project.configurations.flinkShadowJar] +mergeServiceFiles() +dependencies { +exclude(dependency("org.apache.
[flink] branch release-1.13 updated: [hotfix][runtime] Log slot pool status if unable to fulfill job requirements
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.13 by this push: new 037687c [hotfix][runtime] Log slot pool status if unable to fulfill job requirements 037687c is described below commit 037687c2ea1f1b81cc77b14c11ad529b30da4db5 Author: Roman Khachatryan AuthorDate: Mon Jul 12 02:27:47 2021 +0200 [hotfix][runtime] Log slot pool status if unable to fulfill job requirements --- .../runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java| 9 ++--- .../runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java | 8 .../resourcemanager/slotmanager/DeclarativeSlotManager.java | 5 - 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java index 1e9a2b4..c9dafd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java @@ -348,14 +348,17 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem Collection acquiredResources) { assertRunningInMainThread(); -failPendingRequests(); +failPendingRequests(acquiredResources); } -private void failPendingRequests() { +private void failPendingRequests(Collection acquiredResources) { if (!pendingRequests.isEmpty()) { final NoResourceAvailableException cause = new NoResourceAvailableException( -"Could not acquire the minimum required resources."); +"Could not acquire the minimum required resources. Acquired: " ++ acquiredResources ++ ". Current slot pool status: " ++ getSlotServiceStatus()); cancelPendingRequests( request -> !isBatchSlotRequestTimeoutCheckDisabled || !request.isBatchRequest(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java index 2c19471..ec1a970 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java @@ -306,4 +306,12 @@ public class DeclarativeSlotPoolService implements SlotPoolService { STARTED, CLOSED, } + +protected String getSlotServiceStatus() { +return String.format( +"Registered TMs: %d, registered slots: %d free slots: %d", +registeredTaskManagers.size(), +declarativeSlotPool.getAllSlotsInformation().size(), +declarativeSlotPool.getFreeSlotsInformation().size()); +} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java index ab8e155..aa8238f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java @@ -647,7 +647,10 @@ public class DeclarativeSlotManager implements SlotManager { pendingSlots = allocationResult.getNewAvailableResources(); if (!allocationResult.isSuccessfulAllocating() && sendNotEnoughResourceNotifications) { -LOG.warn("Could not fulfill resource requirements of job {}.", jobId); +LOG.warn( +"Could not fulfill resource requirements of job {}. Free slots: {}", +jobId, +slotTracker.getFreeSlots().size()); resourceActions.notifyNotEnoughResourcesAvailable( jobId, resourceTracker.getAcquiredResources(jobId)); return pendingSlots;
[flink] branch master updated: [hotfix][runtime] Log slot pool status if unable to fulfill job requirements
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new b1c5421 [hotfix][runtime] Log slot pool status if unable to fulfill job requirements b1c5421 is described below commit b1c5421a42c9e6bc5b16114fc75ccc7fb001c236 Author: Roman Khachatryan AuthorDate: Mon Jul 12 02:27:47 2021 +0200 [hotfix][runtime] Log slot pool status if unable to fulfill job requirements --- .../runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java| 9 ++--- .../runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java | 8 .../resourcemanager/slotmanager/DeclarativeSlotManager.java | 5 - 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java index e322df1..fbfd1ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java @@ -348,14 +348,17 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem Collection acquiredResources) { assertRunningInMainThread(); -failPendingRequests(); +failPendingRequests(acquiredResources); } -private void failPendingRequests() { +private void failPendingRequests(Collection acquiredResources) { if (!pendingRequests.isEmpty()) { final NoResourceAvailableException cause = new NoResourceAvailableException( -"Could not acquire the minimum required resources."); +"Could not acquire the minimum required resources. Acquired: " ++ acquiredResources ++ ". Current slot pool status: " ++ getSlotServiceStatus()); cancelPendingRequests( request -> !isBatchSlotRequestTimeoutCheckDisabled || !request.isBatchRequest(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java index 2c19471..ec1a970 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java @@ -306,4 +306,12 @@ public class DeclarativeSlotPoolService implements SlotPoolService { STARTED, CLOSED, } + +protected String getSlotServiceStatus() { +return String.format( +"Registered TMs: %d, registered slots: %d free slots: %d", +registeredTaskManagers.size(), +declarativeSlotPool.getAllSlotsInformation().size(), +declarativeSlotPool.getFreeSlotsInformation().size()); +} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java index 07fa560..18809e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java @@ -647,7 +647,10 @@ public class DeclarativeSlotManager implements SlotManager { pendingSlots = allocationResult.getNewAvailableResources(); if (!allocationResult.isSuccessfulAllocating() && sendNotEnoughResourceNotifications) { -LOG.warn("Could not fulfill resource requirements of job {}.", jobId); +LOG.warn( +"Could not fulfill resource requirements of job {}. Free slots: {}", +jobId, +slotTracker.getFreeSlots().size()); resourceActions.notifyNotEnoughResourcesAvailable( jobId, resourceTracker.getAcquiredResources(jobId)); return pendingSlots;
[flink-training] branch master updated: [FLINK-23335][gradle] add separate run tasks for Java/Scala exercises and solutions
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git The following commit(s) were added to refs/heads/master by this push: new 9e90776 [FLINK-23335][gradle] add separate run tasks for Java/Scala exercises and solutions 9e90776 is described below commit 9e907760965a883881d7891baa0d6a82011cb626 Author: Nico Kruber AuthorDate: Fri Jul 2 15:43:13 2021 +0200 [FLINK-23335][gradle] add separate run tasks for Java/Scala exercises and solutions This allows to quickly run the current state of the exercise or the solution implementation from the command line / IDE. --- README.md | 19 ++- build.gradle | 39 +++ hourly-tips/build.gradle | 7 ++- long-ride-alerts/build.gradle | 7 ++- ride-cleansing/build.gradle | 7 ++- rides-and-fares/build.gradle | 7 ++- 6 files changed, 81 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 368b904..834032a 100644 --- a/README.md +++ b/README.md @@ -176,7 +176,24 @@ Each of these exercises includes an `...Exercise` class with most of the necessa > **:information_source: Note:** As long as your `...Exercise` class is > throwing a `MissingSolutionException`, the provided JUnit test classes will > ignore that failure and verify the correctness of the solution > implementation instead. -There are Java and Scala versions of all the exercise, test, and solution classes. +There are Java and Scala versions of all the exercise, test, and solution classes, each of which can be run from IntelliJ as usual. + + Running Exercises, Tests, and Solutions on the Command Line + +You can execute exercises, solutions, and tests via `gradlew` from a CLI. + +- Tests can be executed as usual: + +```bash +./gradlew test +./gradlew ::test +``` + +- For Java/Scala exercises and solutions, we provide special tasks that are listed via + +```bash +./gradlew printRunTasks +``` - diff --git a/build.gradle b/build.gradle index b13aaa9..3702325 100644 --- a/build.gradle +++ b/build.gradle @@ -94,6 +94,18 @@ subprojects { test.runtimeClasspath += configurations.shadow } +project.plugins.withId('application') { +['javaExerciseClassName', 'scalaExerciseClassName', + 'javaSolutionClassName', 'scalaSolutionClassName'].each { property -> +if (project.ext.has(property)) { +project.tasks.create(name: classNamePropertyToTaskName(property), type: JavaExec) { +classpath = project.sourceSets.main.runtimeClasspath +mainClass = project.ext.get(property) +group = 'flink-training' +} +} +} +} jar { manifest { @@ -119,3 +131,30 @@ subprojects { assemble.dependsOn(shadowJar) } + +tasks.register('printRunTasks') { +println '' +println 'Flink Training Tasks runnable from root project \'' + project.name + '\'' +println '' + +subprojects.findAll { project -> +boolean first = true; +project.tasks.withType(JavaExec) { task -> +if (task.group == 'flink-training') { +if (first) { +println '' +println '> Subproject \'' + project.name + '\'' +first = false; +} +println './gradlew :' + project.name + ':' + task.name +} +} +} +} + +static def classNamePropertyToTaskName(String property) { +return 'run' + +property.charAt(0).toString().toUpperCase() + +property.substring(1, property.lastIndexOf('ClassName')) + +} diff --git a/hourly-tips/build.gradle b/hourly-tips/build.gradle index 75e3e60..20bcd13 100644 --- a/hourly-tips/build.gradle +++ b/hourly-tips/build.gradle @@ -1,3 +1,8 @@ +ext.javaExerciseClassName = 'org.apache.flink.training.exercises.hourlytips.HourlyTipsExercise' +ext.scalaExerciseClassName = 'org.apache.flink.training.exercises.hourlytips.scala.HourlyTipsExercise' +ext.javaSolutionClassName = 'org.apache.flink.training.solutions.hourlytips.HourlyTipsSolution' +ext.scalaSolutionClassName = 'org.apache.flink.training.solutions.hourlytips.scala.HourlyTipsSolution' + apply plugin: 'application' -mainClassName = 'org.apache.flink.training.exercises.hourlytips.HourlyTipsExercise' +mainClassName = ext.javaExerciseClassName diff --git a/long-ride-alerts/build.gradle b/long-ride-alerts/build.gradle index 59b9448..8036a4f 100644 --- a/long-ride-alerts/build.gradle +++ b/long-ride-alerts/build.gradle @@ -1,3 +1,8 @@ +ext.javaExerciseClassName = 'org.apache.flink.training.exercises.lo
[flink-training] 03/03: [FLINK-23338] Add .git-blame-ignore-revs for ignoring refactor commit
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git commit 5d17ed7eafbc636a510b4f933101226835fefa24 Author: Nico Kruber AuthorDate: Fri Jul 2 17:46:10 2021 +0200 [FLINK-23338] Add .git-blame-ignore-revs for ignoring refactor commit This file can be used via: $ git config blame.ignoreRevsFile .git-blame-ignore-revs This closes #27. --- .git-blame-ignore-revs | 1 + 1 file changed, 1 insertion(+) diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 000..b41871e --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1 @@ +c4baefd1830abe3ba9bcf200be0f5d00095cae12
[flink-training] 02/03: [FLINK-23338] Format code with Spotless/google-java-format
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git commit c4baefd1830abe3ba9bcf200be0f5d00095cae12 Author: Rufus Refactor AuthorDate: Tue Jul 13 21:45:34 2021 +0200 [FLINK-23338] Format code with Spotless/google-java-format --- README.md | 2 +- .../examples/ridecount/RideCountExample.java | 60 +-- .../exercises/common/datatypes/TaxiFare.java | 154 .../exercises/common/datatypes/TaxiRide.java | 297 +++--- .../common/sources/TaxiFareGenerator.java | 35 +- .../common/sources/TaxiRideGenerator.java | 109 +++--- .../exercises/common/utils/DataGenerator.java | 304 +++ .../exercises/common/utils/ExerciseBase.java | 98 ++--- .../training/exercises/common/utils/GeoUtils.java | 433 ++--- .../common/utils/MissingSolutionException.java | 10 +- .../exercises/testing/TaxiRideTestBase.java| 291 +++--- .../training/exercises/testing/TestSource.java | 44 +-- hourly-tips/DISCUSSION.md | 34 +- .../exercises/hourlytips/HourlyTipsExercise.java | 38 +- .../solutions/hourlytips/HourlyTipsSolution.java | 92 ++--- .../exercises/hourlytips/HourlyTipsTest.java | 97 ++--- long-ride-alerts/DISCUSSION.md | 2 +- long-ride-alerts/README.md | 8 +- .../exercises/longrides/LongRidesExercise.java | 61 ++- .../solutions/longrides/LongRidesSolution.java | 136 +++ .../exercises/longrides/LongRidesTest.java | 178 + .../ridecleansing/RideCleansingExercise.java | 57 ++- .../ridecleansing/RideCleansingSolution.java | 58 +-- .../exercises/ridecleansing/RideCleansingTest.java | 73 ++-- .../ridesandfares/RidesAndFaresExercise.java | 69 ++-- .../ridesandfares/RidesAndFaresSolution.java | 162 .../exercises/ridesandfares/RidesAndFaresTest.java | 93 ++--- 27 files changed, 1489 insertions(+), 1506 deletions(-) diff --git a/README.md b/README.md index 834032a..37c1d26 100644 --- a/README.md +++ b/README.md @@ -111,7 +111,7 @@ Once that’s done you should be able to open [`RideCleansingTest`](ride-cleansi ## Using the Taxi Data Streams -These exercises use data [generators](common/src/main/java/org/apache/flink/training/exercises/common/sources) that produce simulated event streams +These exercises use data [generators](common/src/main/java/org/apache/flink/training/exercises/common/sources) that produce simulated event streams inspired by those shared by the [New York City Taxi & Limousine Commission](http://www.nyc.gov/html/tlc/html/home/home.shtml) in their public [data set](https://uofi.app.box.com/NYCtaxidata) about taxi rides in New York City. diff --git a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java index 74e56fe..36b7b89 100644 --- a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java +++ b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java @@ -29,44 +29,46 @@ import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator; /** * Example that counts the rides for each driver. * - * Note that this is implicitly keeping state for each driver. - * This sort of simple, non-windowed aggregation on an unbounded set of keys will use an unbounded amount of state. - * When this is an issue, look at the SQL/Table API, or ProcessFunction, or state TTL, all of which provide + * Note that this is implicitly keeping state for each driver. This sort of simple, non-windowed + * aggregation on an unbounded set of keys will use an unbounded amount of state. When this is an + * issue, look at the SQL/Table API, or ProcessFunction, or state TTL, all of which provide * mechanisms for expiring state for stale keys. */ public class RideCountExample { - /** -* Main method. -* -* @throws Exception which occurs during job execution. -*/ - public static void main(String[] args) throws Exception { +/** + * Main method. + * + * @throws Exception which occurs during job execution. + */ +public static void main(String[] args) throws Exception { - // set up streaming execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// set up streaming execution environment +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // start the data generator - DataStream rides = env.addSource(new TaxiRideGenerator()); +// start
[flink-training] 01/03: [FLINK-23338] Add Spotless plugin with Google AOSP style
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git commit 8399d88b3638d69db01cea341631979a281d1072 Author: Nico Kruber AuthorDate: Fri Jul 2 17:36:31 2021 +0200 [FLINK-23338] Add Spotless plugin with Google AOSP style Use the same format as defined by the main Flink project. See https://issues.apache.org/jira/browse/FLINK-20651 --- build.gradle | 27 ++ config/checkstyle/checkstyle.xml | 891 ++--- config/checkstyle/suppressions.xml | 10 +- 3 files changed, 460 insertions(+), 468 deletions(-) diff --git a/build.gradle b/build.gradle index 3702325..0442b26 100644 --- a/build.gradle +++ b/build.gradle @@ -17,6 +17,7 @@ plugins { id 'com.github.johnrengelman.shadow' version '7.0.0' apply false +id "com.diffplug.spotless" version "5.14.0" apply false } description = "Flink Training Exercises" @@ -24,6 +25,20 @@ description = "Flink Training Exercises" allprojects { group = 'org.apache.flink' version = '1.13-SNAPSHOT' + +apply plugin: 'com.diffplug.spotless' + +spotless { +format 'misc', { +// define the files to apply `misc` to +target '*.gradle', '*.md', '.gitignore' + +// define the steps to apply to those files +trimTrailingWhitespace() +indentWithSpaces(4) +endWithNewline() +} +} } subprojects { @@ -107,6 +122,18 @@ subprojects { } } +spotless { +java { +googleJavaFormat('1.7').aosp() + +// \# refers to static imports +importOrder('org.apache.flink', 'org.apache.flink.shaded', '', 'javax', 'java', 'scala', '\\#') +removeUnusedImports() + +targetExclude("**/generated*/*.java") +} +} + jar { manifest { attributes 'Built-By': System.getProperty('user.name'), diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml index e656fda..56d219d 100644 --- a/config/checkstyle/checkstyle.xml +++ b/config/checkstyle/checkstyle.xml @@ -31,540 +31,505 @@ This file is based on the checkstyle file of Apache Beam. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + - - - - - - + + + + + - - - - - - + + + + - - - - - - - - - + + + + - - - - - - - - - - - - - - - - + + + - - - - + + - - - - - + + - - - - - - - - - - - + + - - + + - - - - - + + + + + + - - - - + ---> + + + + + + - + + + + + - - - - - - + + + + + - - - - - + - - - - - - - - - +IllegalImport cannot blacklist classes so we have to fall back to Regexp. - - - - - - - - - - +--> - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
[flink-training] branch master updated (9e90776 -> 5d17ed7)
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git. from 9e90776 [FLINK-23335][gradle] add separate run tasks for Java/Scala exercises and solutions new 8399d88 [FLINK-23338] Add Spotless plugin with Google AOSP style new c4baefd [FLINK-23338] Format code with Spotless/google-java-format new 5d17ed7 [FLINK-23338] Add .git-blame-ignore-revs for ignoring refactor commit The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .git-blame-ignore-revs | 1 + README.md | 2 +- build.gradle | 27 + .../examples/ridecount/RideCountExample.java | 60 +- .../exercises/common/datatypes/TaxiFare.java | 154 ++-- .../exercises/common/datatypes/TaxiRide.java | 297 --- .../common/sources/TaxiFareGenerator.java | 35 +- .../common/sources/TaxiRideGenerator.java | 109 +-- .../exercises/common/utils/DataGenerator.java | 304 --- .../exercises/common/utils/ExerciseBase.java | 98 +-- .../training/exercises/common/utils/GeoUtils.java | 433 +- .../common/utils/MissingSolutionException.java | 10 +- .../exercises/testing/TaxiRideTestBase.java| 291 +++ .../training/exercises/testing/TestSource.java | 44 +- config/checkstyle/checkstyle.xml | 891 ++--- config/checkstyle/suppressions.xml | 10 +- hourly-tips/DISCUSSION.md | 34 +- .../exercises/hourlytips/HourlyTipsExercise.java | 38 +- .../solutions/hourlytips/HourlyTipsSolution.java | 92 +-- .../exercises/hourlytips/HourlyTipsTest.java | 97 +-- long-ride-alerts/DISCUSSION.md | 2 +- long-ride-alerts/README.md | 8 +- .../exercises/longrides/LongRidesExercise.java | 61 +- .../solutions/longrides/LongRidesSolution.java | 136 ++-- .../exercises/longrides/LongRidesTest.java | 178 ++-- .../ridecleansing/RideCleansingExercise.java | 57 +- .../ridecleansing/RideCleansingSolution.java | 58 +- .../exercises/ridecleansing/RideCleansingTest.java | 73 +- .../ridesandfares/RidesAndFaresExercise.java | 69 +- .../ridesandfares/RidesAndFaresSolution.java | 162 ++-- .../exercises/ridesandfares/RidesAndFaresTest.java | 93 +-- 31 files changed, 1950 insertions(+), 1974 deletions(-) create mode 100644 .git-blame-ignore-revs
[flink] 02/02: [hotfix] Remove unnecessary warning suppression.
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6451a1c169fbde44c2c7c458cc717c0fcad01cf9 Author: Stephan Ewen AuthorDate: Tue Jul 13 15:05:25 2021 +0200 [hotfix] Remove unnecessary warning suppression. This suppression was added to supress compiler warning about 'finally' not returning from 'System.exit()'. With the introduction of the indirection to the security manager, this compiler warning no longer happens. --- .../src/main/java/org/apache/flink/util/FatalExitExceptionHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/FatalExitExceptionHandler.java b/flink-core/src/main/java/org/apache/flink/util/FatalExitExceptionHandler.java index 2433398..8ff0e9b 100644 --- a/flink-core/src/main/java/org/apache/flink/util/FatalExitExceptionHandler.java +++ b/flink-core/src/main/java/org/apache/flink/util/FatalExitExceptionHandler.java @@ -37,7 +37,6 @@ public final class FatalExitExceptionHandler implements Thread.UncaughtException public static final int EXIT_CODE = -17; @Override -@SuppressWarnings("finally") public void uncaughtException(Thread t, Throwable e) { try { LOG.error(
[flink] 01/02: [FLINK-22545][coordination] Fix check during creation of Source Coordinator thread.
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 4b867d8a6c187d6bef45294a6e6a524254410167 Author: Stephan Ewen AuthorDate: Tue Jul 13 14:36:32 2021 +0200 [FLINK-22545][coordination] Fix check during creation of Source Coordinator thread. The check was meant as a safeguard to prevent re-instantiation after fatal errors killed a previous thread. But the check was susceptible to thread termination due to idleness in the executor. This updates the check to only fail if there is in fact an instantiation next to a running thread, or after a previously crashed thread. --- .../coordinator/SourceCoordinatorProvider.java | 33 +- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java index bb6f835..56248f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator; import org.apache.flink.util.FatalExitExceptionHandler; +import javax.annotation.Nullable; + import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -85,13 +87,16 @@ public class SourceCoordinatorProvider } /** A thread factory class that provides some helper methods. */ -public static class CoordinatorExecutorThreadFactory implements ThreadFactory { +public static class CoordinatorExecutorThreadFactory +implements ThreadFactory, Thread.UncaughtExceptionHandler { private final String coordinatorThreadName; private final ClassLoader cl; private final Thread.UncaughtExceptionHandler errorHandler; -private Thread t; +@Nullable private Thread t; + +@Nullable private volatile Throwable previousFailureReason; CoordinatorExecutorThreadFactory( final String coordinatorThreadName, final ClassLoader contextClassLoader) { @@ -110,18 +115,32 @@ public class SourceCoordinatorProvider @Override public synchronized Thread newThread(Runnable r) { -if (t != null) { +if (t != null && t.isAlive()) { +throw new Error( +"Source Coordinator Thread already exists. There should never be more than one " ++ "thread driving the actions of a Source Coordinator. Existing Thread: " ++ t); +} +if (t != null && previousFailureReason != null) { throw new Error( -"This indicates that a fatal error has happened and caused the " -+ "coordinator executor thread to exit. Check the earlier logs " -+ "to see the root cause of the problem."); +"The following fatal error has happened in a previously spawned " ++ "Source Coordinator thread. No new thread can be spawned.", +previousFailureReason); } t = new Thread(r, coordinatorThreadName); t.setContextClassLoader(cl); -t.setUncaughtExceptionHandler(errorHandler); +t.setUncaughtExceptionHandler(this); return t; } +@Override +public synchronized void uncaughtException(Thread t, Throwable e) { +if (previousFailureReason == null) { +previousFailureReason = e; +} +errorHandler.uncaughtException(t, e); +} + String getCoordinatorThreadName() { return coordinatorThreadName; }
[flink] branch master updated (b1c5421 -> 6451a1c)
This is an automated email from the ASF dual-hosted git repository. sewen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from b1c5421 [hotfix][runtime] Log slot pool status if unable to fulfill job requirements new 4b867d8 [FLINK-22545][coordination] Fix check during creation of Source Coordinator thread. new 6451a1c [hotfix] Remove unnecessary warning suppression. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/util/FatalExitExceptionHandler.java | 1 - .../coordinator/SourceCoordinatorProvider.java | 33 +- 2 files changed, 26 insertions(+), 8 deletions(-)
[flink] branch release-1.13 updated: [FLINK-22545][coordination] Fix check during creation of Source Coordinator thread.
This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.13 by this push: new 17a294d [FLINK-22545][coordination] Fix check during creation of Source Coordinator thread. 17a294d is described below commit 17a294daacdc67b1939607a41e67e56af2fa6888 Author: Stephan Ewen AuthorDate: Tue Jul 13 14:36:32 2021 +0200 [FLINK-22545][coordination] Fix check during creation of Source Coordinator thread. The check was meant as a safeguard to prevent re-instantiation after fatal errors killed a previous thread. But the check was susceptible to thread termination due to idleness in the executor. This updates the check to only fail if there is in fact an instantiation next to a running thread, or after a previously crashed thread. --- .../coordinator/SourceCoordinatorProvider.java | 33 +- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java index 563ed28..1660027 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator; import org.apache.flink.runtime.util.FatalExitExceptionHandler; +import javax.annotation.Nullable; + import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -85,13 +87,16 @@ public class SourceCoordinatorProvider } /** A thread factory class that provides some helper methods. */ -public static class CoordinatorExecutorThreadFactory implements ThreadFactory { +public static class CoordinatorExecutorThreadFactory +implements ThreadFactory, Thread.UncaughtExceptionHandler { private final String coordinatorThreadName; private final ClassLoader cl; private final Thread.UncaughtExceptionHandler errorHandler; -private Thread t; +@Nullable private Thread t; + +@Nullable private volatile Throwable previousFailureReason; CoordinatorExecutorThreadFactory( final String coordinatorThreadName, final ClassLoader contextClassLoader) { @@ -110,18 +115,32 @@ public class SourceCoordinatorProvider @Override public synchronized Thread newThread(Runnable r) { -if (t != null) { +if (t != null && t.isAlive()) { +throw new Error( +"Source Coordinator Thread already exists. There should never be more than one " ++ "thread driving the actions of a Source Coordinator. Existing Thread: " ++ t); +} +if (t != null && previousFailureReason != null) { throw new Error( -"This indicates that a fatal error has happened and caused the " -+ "coordinator executor thread to exit. Check the earlier logs" -+ "to see the root cause of the problem."); +"The following fatal error has happened in a previously spawned " ++ "Source Coordinator thread. No new thread can be spawned.", +previousFailureReason); } t = new Thread(r, coordinatorThreadName); t.setContextClassLoader(cl); -t.setUncaughtExceptionHandler(errorHandler); +t.setUncaughtExceptionHandler(this); return t; } +@Override +public synchronized void uncaughtException(Thread t, Throwable e) { +if (previousFailureReason == null) { +previousFailureReason = e; +} +errorHandler.uncaughtException(t, e); +} + String getCoordinatorThreadName() { return coordinatorThreadName; }
[flink] branch release-1.12 updated (2c3f8f6 -> 331ae2c)
This is an automated email from the ASF dual-hosted git repository. sewen pushed a change to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git. from 2c3f8f6 [FLINK-23233][runtime] Ensure checkpoints confirmed after all the failed events processed for OepratorCoordinator add 331ae2c [FLINK-22545][coordination] Fix check during creation of Source Coordinator thread. No new revisions were added by this update. Summary of changes: .../coordinator/SourceCoordinatorProvider.java | 33 +- 1 file changed, 26 insertions(+), 7 deletions(-)
[flink] branch master updated (6451a1c -> 2dec1fa)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 6451a1c [hotfix] Remove unnecessary warning suppression. add 2dec1fa [hotfix][docs][kafka] Corrected idleness attribute name No new revisions were added by this update. Summary of changes: docs/content.zh/docs/connectors/datastream/kafka.md | 2 +- docs/content/docs/connectors/datastream/kafka.md| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-)
[flink] branch master updated (2dec1fa -> 08b2048)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2dec1fa [hotfix][docs][kafka] Corrected idleness attribute name add 08b2048 [FLINK-23347][docs][datatream] Updated operators windowAll description No new revisions were added by this update. Summary of changes: docs/content.zh/docs/dev/datastream/operators/overview.md | 2 +- docs/content/docs/dev/datastream/operators/overview.md| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-)
[flink] branch master updated (08b2048 -> df4fa8a)
This is an automated email from the ASF dual-hosted git repository. hxb pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 08b2048 [FLINK-23347][docs][datatream] Updated operators windowAll description add df4fa8a [FLINK-22865][python] Optimize state serialize/deserialize in PyFlink No new revisions were added by this update. Summary of changes: flink-python/pyflink/common/serializer.py | 4 + flink-python/pyflink/datastream/data_stream.py | 7 +- flink-python/pyflink/datastream/state.py | 26 + .../pyflink/datastream/tests/test_data_stream.py | 35 +++ flink-python/pyflink/datastream/window.py | 22 ++-- .../fn_execution/beam/beam_coder_impl_fast.pyx | 8 +- .../fn_execution/beam/beam_coder_impl_slow.py | 5 +- .../pyflink/fn_execution/beam/beam_coders.py | 29 +- .../fn_execution/beam/beam_operations_fast.pyx | 2 +- .../beam/{beam_stream.pxd => beam_stream_fast.pxd} | 0 .../beam/{beam_stream.pyx => beam_stream_fast.pyx} | 0 .../pyflink/fn_execution/beam/beam_stream_slow.py | 25 +++-- .../pyflink/fn_execution/coder_impl_fast.pxd | 9 +- .../pyflink/fn_execution/coder_impl_fast.pyx | 45 - .../pyflink/fn_execution/coder_impl_slow.py| 51 +- flink-python/pyflink/fn_execution/coders.py| 111 - .../pyflink/fn_execution/datastream/operations.py | 4 +- .../fn_execution/datastream/runtime_context.py | 25 +++-- .../datastream/window/merging_window_set.py| 10 +- .../datastream/window/window_operator.py | 12 +-- flink-python/pyflink/fn_execution/state_impl.py| 2 + flink-python/pyflink/fn_execution/stream_slow.py | 3 +- .../pyflink/fn_execution/table/aggregate_fast.pyx | 15 ++- .../pyflink/fn_execution/table/aggregate_slow.py | 15 ++- .../pyflink/fn_execution/table/operations.py | 4 +- .../pyflink/fn_execution/table/state_data_view.py | 4 +- .../fn_execution/table/window_aggregate_fast.pyx | 12 +-- .../fn_execution/table/window_aggregate_slow.py| 12 +-- .../pyflink/fn_execution/table/window_assigner.py | 5 +- .../pyflink/fn_execution/table/window_context.py | 15 ++- .../pyflink/fn_execution/table/window_trigger.py | 2 +- flink-python/setup.py | 8 +- 32 files changed, 336 insertions(+), 191 deletions(-) rename flink-python/pyflink/fn_execution/beam/{beam_stream.pxd => beam_stream_fast.pxd} (100%) rename flink-python/pyflink/fn_execution/beam/{beam_stream.pyx => beam_stream_fast.pyx} (100%) copy flink-dist/src/main/flink-bin/conf/zoo.cfg => flink-python/pyflink/fn_execution/beam/beam_stream_slow.py (65%)
[flink] branch release-1.13.2-rc2 created (now acaad85)
This is an automated email from the ASF dual-hosted git repository. tangyun pushed a change to branch release-1.13.2-rc2 in repository https://gitbox.apache.org/repos/asf/flink.git. at acaad85 Commit for release 1.13.2 This branch includes the following new commits: new acaad85 Commit for release 1.13.2 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[flink] 01/01: Commit for release 1.13.2
This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch release-1.13.2-rc2 in repository https://gitbox.apache.org/repos/asf/flink.git commit acaad856ee68422130719bd44cb204da45d130ab Author: Yun Tang AuthorDate: Wed Jul 14 10:38:57 2021 +0800 Commit for release 1.13.2 --- docs/config.toml | 2 +- flink-annotations/pom.xml | 2 +- flink-clients/pom.xml | 2 +- flink-connectors/flink-connector-base/pom.xml | 2 +- flink-connectors/flink-connector-cassandra/pom.xml| 2 +- flink-connectors/flink-connector-elasticsearch-base/pom.xml | 2 +- flink-connectors/flink-connector-elasticsearch5/pom.xml | 2 +- flink-connectors/flink-connector-elasticsearch6/pom.xml | 2 +- flink-connectors/flink-connector-elasticsearch7/pom.xml | 2 +- flink-connectors/flink-connector-files/pom.xml| 2 +- flink-connectors/flink-connector-gcp-pubsub/pom.xml | 2 +- flink-connectors/flink-connector-hbase-1.4/pom.xml| 2 +- flink-connectors/flink-connector-hbase-2.2/pom.xml| 2 +- flink-connectors/flink-connector-hbase-base/pom.xml | 2 +- flink-connectors/flink-connector-hive/pom.xml | 2 +- flink-connectors/flink-connector-jdbc/pom.xml | 2 +- flink-connectors/flink-connector-kafka/pom.xml| 2 +- flink-connectors/flink-connector-kinesis/pom.xml | 2 +- flink-connectors/flink-connector-nifi/pom.xml | 2 +- flink-connectors/flink-connector-rabbitmq/pom.xml | 2 +- flink-connectors/flink-connector-twitter/pom.xml | 2 +- flink-connectors/flink-file-sink-common/pom.xml | 2 +- flink-connectors/flink-hadoop-compatibility/pom.xml | 2 +- flink-connectors/flink-hcatalog/pom.xml | 2 +- flink-connectors/flink-sql-connector-elasticsearch6/pom.xml | 2 +- flink-connectors/flink-sql-connector-elasticsearch7/pom.xml | 2 +- flink-connectors/flink-sql-connector-hbase-1.4/pom.xml| 2 +- flink-connectors/flink-sql-connector-hbase-2.2/pom.xml| 2 +- flink-connectors/flink-sql-connector-hive-1.2.2/pom.xml | 2 +- flink-connectors/flink-sql-connector-hive-2.2.0/pom.xml | 2 +- flink-connectors/flink-sql-connector-hive-2.3.6/pom.xml | 2 +- flink-connectors/flink-sql-connector-hive-3.1.2/pom.xml | 2 +- flink-connectors/flink-sql-connector-kafka/pom.xml| 2 +- flink-connectors/flink-sql-connector-kinesis/pom.xml | 2 +- flink-connectors/pom.xml | 2 +- flink-container/pom.xml | 2 +- flink-contrib/flink-connector-wikiedits/pom.xml | 2 +- flink-contrib/pom.xml | 2 +- flink-core/pom.xml| 2 +- flink-dist/pom.xml| 2 +- flink-docs/pom.xml| 2 +- flink-end-to-end-tests/flink-batch-sql-test/pom.xml | 2 +- flink-end-to-end-tests/flink-cli-test/pom.xml | 2 +- flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml| 2 +- .../flink-connector-gcp-pubsub-emulator-tests/pom.xml | 2 +- flink-end-to-end-tests/flink-dataset-allround-test/pom.xml| 2 +- .../flink-dataset-fine-grained-recovery-test/pom.xml | 2 +- flink-end-to-end-tests/flink-datastream-allround-test/pom.xml | 2 +- flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml | 2 +- flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml | 2 +- flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml | 2 +- flink-end-to-end-tests/flink-elasticsearch7-test/pom.xml | 2 +- flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml| 2 +- flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml | 2 +- flink-end-to-end-tests/flink-end-to-end-tests-hbase/pom.xml | 2 +- flink-end-to-end-tests/flink-file-sink-test/pom.xml | 2 +- flink-end-to-end-tests/flink-glue-schema-registry-test/pom.xml| 2 +- flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml | 2 +- flink-end-to-end-tests/flink-high-parallelism-iterations-test/pom.xml | 2 +- .../flink-local-recovery-and-allocation-test/pom.xml