[flink] branch master updated (073f9f3f -> 3671316)

2021-07-21 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 073f9f3f [FLINK-23455][e2e] Remove the usage of the sql-client YAML 
file in SQLJobSubmission
 add 3671316  [FLINK-23446][e2e] Refactor SQL Client end to end tests to 
replace YAML file with SQL DDL

No new revisions were added by this update.

Summary of changes:
 .../test-scripts/kafka_sql_common.sh   |  64 +++--
 .../test-scripts/test_sql_client.sh| 102 -
 2 files changed, 53 insertions(+), 113 deletions(-)


[flink] branch master updated (6709573 -> 073f9f3f)

2021-07-21 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 6709573  [FLINK-23204] Provide StateBackends access to MailboxExecutor 
(#16531)
 add 073f9f3f [FLINK-23455][e2e] Remove the usage of the sql-client YAML 
file in SQLJobSubmission

No new revisions were added by this update.

Summary of changes:
 .../flink/tests/util/flink/FlinkContainer.java | 14 -
 .../flink/tests/util/flink/FlinkDistribution.java  | 12 
 .../flink/tests/util/flink/SQLJobSubmission.java   | 33 ++
 3 files changed, 2 insertions(+), 57 deletions(-)


[flink] branch master updated (a26887e -> 6709573)

2021-07-21 Thread yuanmei
This is an automated email from the ASF dual-hosted git repository.

yuanmei pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from a26887e  [FLINK-23434][table-planner] Fix the inconsistent type in 
IncrementalAggregateRule when the query has one distinct agg function and count 
star agg function
 add 6709573  [FLINK-23204] Provide StateBackends access to MailboxExecutor 
(#16531)

No new revisions were added by this update.

Summary of changes:
 .../flink/runtime/execution/Environment.java   | 18 
 .../flink/runtime/mailbox}/MailboxExecutor.java| 10 +++
 .../runtime/taskmanager/RuntimeEnvironment.java| 31 
 .../runtime/mailbox}/SyncMailboxExecutor.java  |  2 +-
 .../operators/testutils/MockEnvironment.java   | 34 ++
 .../changelog/ChangelogKeyedStateBackend.java  | 12 +++-
 .../state/changelog/ChangelogStateBackend.java |  4 ++-
 .../source/ContinuousFileReaderOperator.java   |  2 +-
 .../ContinuousFileReaderOperatorFactory.java   |  2 +-
 .../api/operators/StreamOperatorFactoryUtil.java   |  1 +
 .../api/operators/YieldingOperatorFactory.java |  1 +
 .../api/operators/async/AsyncWaitOperator.java |  2 +-
 .../operators/async/AsyncWaitOperatorFactory.java  |  2 +-
 .../io/checkpointing/CheckpointedInputGate.java|  2 +-
 .../io/checkpointing/InputProcessorUtil.java   |  2 +-
 .../tasks/ProcessingTimeServiceFactory.java|  2 +-
 .../runtime/tasks/StreamOperatorWrapper.java   |  2 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  5 +++-
 .../tasks/mailbox/MailboxExecutorFactory.java  |  2 +-
 .../runtime/tasks/mailbox/MailboxExecutorImpl.java |  2 +-
 .../runtime/tasks/mailbox/MailboxProcessor.java|  2 +-
 .../runtime/tasks/mailbox/TaskMailbox.java |  2 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  2 +-
 .../runtime/io/StreamTaskNetworkInputTest.java |  2 +-
 .../AlignedCheckpointsMassiveRandomTest.java   |  2 +-
 .../io/checkpointing/AlignedCheckpointsTest.java   |  2 +-
 .../checkpointing/AlternatingCheckpointsTest.java  |  2 +-
 .../CheckpointBarrierTrackerTest.java  |  2 +-
 .../io/checkpointing/InputProcessorUtilTest.java   |  2 +-
 .../io/checkpointing/UnalignedCheckpointsTest.java |  2 +-
 .../runtime/operators/MailboxOperatorTest.java |  2 +-
 .../operators/StreamTaskOperatorTimerTest.java |  2 +-
 .../runtime/tasks/StreamOperatorWrapperTest.java   |  2 +-
 .../tasks/StreamTaskMailboxTestHarness.java|  2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java|  2 +-
 .../runtime/tasks/StreamTaskTestHarness.java   |  2 +-
 .../tasks/mailbox/MailboxExecutorImplTest.java |  2 +-
 .../tasks/mailbox/TaskMailboxProcessorTest.java|  2 +-
 .../util/TestCheckpointedInputGateBuilder.java |  4 +--
 39 files changed, 138 insertions(+), 40 deletions(-)
 rename 
{flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators => 
flink-runtime/src/main/java/org/apache/flink/runtime/mailbox}/MailboxExecutor.java
 (97%)
 rename 
{flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators => 
flink-runtime/src/test/java/org/apache/flink/runtime/mailbox}/SyncMailboxExecutor.java
 (97%)


[flink] branch master updated (a123091 -> a26887e)

2021-07-21 Thread godfrey
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from a123091  [FLINK-23401][python] Separate data and timer processing into 
different channels
 add a26887e  [FLINK-23434][table-planner] Fix the inconsistent type in 
IncrementalAggregateRule when the query has one distinct agg function and count 
star agg function

No new revisions were added by this update.

Summary of changes:
 .../stream/StreamExecGlobalGroupAggregate.java |  17 +
 .../StreamPhysicalGlobalGroupAggregate.scala   |  19 +-
 .../physical/stream/IncrementalAggregateRule.scala |  21 +-
 .../table/planner/plan/utils/AggregateUtil.scala   |  33 +
 .../stream/IncrementalAggregateJsonPlanTest.java   |  21 +
 ...AggregateWithSumCountDistinctAndRetraction.out} | 662 -
 .../plan/stream/sql/agg/DistinctAggregateTest.xml  | 179 +-
 .../stream/sql/agg/IncrementalAggregateTest.xml|  41 +-
 .../stream/sql/agg/DistinctAggregateTest.scala |  16 +
 .../runtime/stream/sql/SplitAggregateITCase.scala  |  52 ++
 10 files changed, 744 insertions(+), 317 deletions(-)
 copy 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/{GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
 => 
IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out}
 (56%)


[flink] 01/03: [FLINK-23401][python] Refactor the construction of transformation into getTransforms

2021-07-21 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a653c5acbe6ecd347ffbe3686eb324f88c0596a
Author: Dian Fu 
AuthorDate: Fri Jul 2 15:12:09 2021 +0800

[FLINK-23401][python] Refactor the construction of transformation into 
getTransforms

This closes #16541.
---
 .../java/org/apache/flink/python/Constants.java|  8 
 .../beam/BeamDataStreamPythonFunctionRunner.java   | 35 --
 .../python/beam/BeamPythonFunctionRunner.java  | 56 +++---
 .../python/beam/BeamTablePythonFunctionRunner.java | 31 ++--
 4 files changed, 85 insertions(+), 45 deletions(-)

diff --git a/flink-python/src/main/java/org/apache/flink/python/Constants.java 
b/flink-python/src/main/java/org/apache/flink/python/Constants.java
index 54d1d06..555ccc2 100644
--- a/flink-python/src/main/java/org/apache/flink/python/Constants.java
+++ b/flink-python/src/main/java/org/apache/flink/python/Constants.java
@@ -27,4 +27,12 @@ public class Constants {
 
 // coder urns
 public static final String FLINK_CODER_URN = "flink:coder:v1";
+
+// execution graph
+public static final String TRANSFORM_ID = "transform";
+public static final String MAIN_INPUT_NAME = "input";
+public static final String MAIN_OUTPUT_NAME = "output";
+
+public static final String INPUT_COLLECTION_ID = "input";
+public static final String OUTPUT_COLLECTION_ID = "output";
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
index 5024ed0..a9f5b91 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
@@ -25,11 +25,21 @@ import org.apache.flink.python.env.PythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.Map;
 
+import static org.apache.flink.python.Constants.INPUT_COLLECTION_ID;
+import static org.apache.flink.python.Constants.MAIN_INPUT_NAME;
+import static org.apache.flink.python.Constants.MAIN_OUTPUT_NAME;
+import static org.apache.flink.python.Constants.OUTPUT_COLLECTION_ID;
+import static org.apache.flink.python.Constants.TRANSFORM_ID;
+
 /**
  * {@link BeamDataStreamPythonFunctionRunner} is responsible for starting a 
beam python harness to
  * execute user defined python function.
@@ -37,6 +47,7 @@ import java.util.Map;
 @Internal
 public class BeamDataStreamPythonFunctionRunner extends 
BeamPythonFunctionRunner {
 
+private final String functionUrn;
 private final FlinkFnApi.UserDefinedDataStreamFunction 
userDefinedDataStreamFunction;
 
 public BeamDataStreamPythonFunctionRunner(
@@ -56,7 +67,6 @@ public class BeamDataStreamPythonFunctionRunner extends 
BeamPythonFunctionRunner
 super(
 taskName,
 environmentManager,
-functionUrn,
 jobOptions,
 flinkMetricContainer,
 stateBackend,
@@ -66,11 +76,28 @@ public class BeamDataStreamPythonFunctionRunner extends 
BeamPythonFunctionRunner
 managedMemoryFraction,
 inputCoderDescriptor,
 outputCoderDescriptor);
-this.userDefinedDataStreamFunction = userDefinedDataStreamFunction;
+this.functionUrn = Preconditions.checkNotNull(functionUrn);
+this.userDefinedDataStreamFunction =
+Preconditions.checkNotNull(userDefinedDataStreamFunction);
 }
 
 @Override
-protected byte[] getUserDefinedFunctionsProtoBytes() {
-return this.userDefinedDataStreamFunction.toByteArray();
+protected Map getTransforms() {
+return Collections.singletonMap(
+TRANSFORM_ID,
+RunnerApi.PTransform.newBuilder()
+.setUniqueName(TRANSFORM_ID)
+.setSpec(
+RunnerApi.FunctionSpec.newBuilder()
+.setUrn(functionUrn)
+.setPayload(
+
org.apache.beam.vendor.grpc.v1p26p0.com.google
+
.protobuf.ByteString.copyFrom(
+
userDefinedDataStreamFunction
+  

[flink] branch master updated (be82c54 -> a123091)

2021-07-21 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from be82c54  [hotfix][coordination] Provide richer exception message for 
suppressed exceptions durnig coordinator failover
 new 1a653c5  [FLINK-23401][python] Refactor the construction of 
transformation into getTransforms
 new 9df4b76  [FLINK-23401][python] Use ParDoPayload to represent the 
user-defined functions in Python DataStream API
 new a123091  [FLINK-23401][python] Separate data and timer processing into 
different channels

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../fn_execution/beam/beam_coder_impl_fast.pyx |   4 +-
 .../pyflink/fn_execution/beam/beam_operations.py   |  36 ++---
 .../fn_execution/beam/beam_operations_fast.pxd |   5 +-
 .../fn_execution/beam/beam_operations_fast.pyx |  31 +++--
 .../fn_execution/beam/beam_operations_slow.py  |  12 ++
 .../pyflink/fn_execution/beam/beam_stream_fast.pxd |   2 +-
 .../pyflink/fn_execution/beam/beam_stream_fast.pyx |  22 ++-
 .../pyflink/fn_execution/coder_impl_fast.pyx   |   3 +-
 .../pyflink/fn_execution/coder_impl_slow.py| 116 
 .../fn_execution/datastream/input_handler.py   | 114 
 .../pyflink/fn_execution/datastream/operations.py  | 107 +++
 .../fn_execution/datastream/output_handler.py  |  63 -
 .../fn_execution/datastream/timerservice_impl.py   | 129 ++
 .../datastream/window/window_operator.py   |  22 +--
 .../pyflink/fn_execution/table/operations.py   |   2 +-
 .../fn_execution/table/window_aggregate_fast.pyx   |   6 +-
 .../fn_execution/table/window_aggregate_slow.py|   6 +-
 .../pyflink/fn_execution/table/window_context.py   |   7 +-
 .../java/org/apache/flink/python/Constants.java|  13 ++
 .../apache/flink/python/PythonFunctionRunner.java  |   3 +
 .../python/OneInputPythonFunctionOperator.java |   4 +-
 .../python/PythonKeyedCoProcessOperator.java   | 147 ++---
 .../python/PythonKeyedProcessOperator.java | 116 +---
 .../python/TwoInputPythonFunctionOperator.java |   4 +-
 .../python/collector/RunnerOutputCollector.java|  60 +
 .../api/operators/python/timer/TimerHandler.java   |  65 +
 .../python/timer/TimerRegistration.java}   | 118 +
 .../api/operators/python/timer/TimerUtils.java |  45 +++
 .../beam/BeamDataStreamPythonFunctionRunner.java   |  98 +-
 .../python/beam/BeamPythonFunctionRunner.java  | 128 --
 .../flink/streaming/api/utils/PythonTypeUtils.java |  54 +---
 .../utils/input/KeyedInputWithTimerRowFactory.java |  85 
 .../input/KeyedTwoInputWithTimerRowFactory.java|  87 
 .../flink/streaming/api/utils/input/TimerType.java |  31 -
 .../api/utils/output/TimerOperandType.java |  54 
 .../python/beam/BeamTablePythonFunctionRunner.java |  50 ++-
 .../flink/table/runtime/utils/PythonTestUtils.java |  19 ++-
 37 files changed, 1052 insertions(+), 816 deletions(-)
 delete mode 100644 
flink-python/pyflink/fn_execution/datastream/output_handler.py
 create mode 100644 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/collector/RunnerOutputCollector.java
 create mode 100644 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerHandler.java
 rename 
flink-python/src/main/java/org/apache/flink/streaming/api/{utils/output/OutputWithTimerRowHandler.java
 => operators/python/timer/TimerRegistration.java} (53%)
 create mode 100644 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
 delete mode 100644 
flink-python/src/main/java/org/apache/flink/streaming/api/utils/input/KeyedInputWithTimerRowFactory.java
 delete mode 100644 
flink-python/src/main/java/org/apache/flink/streaming/api/utils/input/KeyedTwoInputWithTimerRowFactory.java
 delete mode 100644 
flink-python/src/main/java/org/apache/flink/streaming/api/utils/input/TimerType.java
 delete mode 100644 
flink-python/src/main/java/org/apache/flink/streaming/api/utils/output/TimerOperandType.java


[flink] 02/03: [FLINK-23401][python] Use ParDoPayload to represent the user-defined functions in Python DataStream API

2021-07-21 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9df4b7608aea73d7b2ad3fc6aeee559a84e58c0b
Author: Dian Fu 
AuthorDate: Fri Jul 2 16:03:23 2021 +0800

[FLINK-23401][python] Use ParDoPayload to represent the user-defined 
functions in Python DataStream API

This closes #16541.
---
 .../pyflink/fn_execution/beam/beam_operations.py   | 36 +-
 .../beam/BeamDataStreamPythonFunctionRunner.java   | 24 ---
 2 files changed, 40 insertions(+), 20 deletions(-)

diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations.py 
b/flink-python/pyflink/fn_execution/beam/beam_operations.py
index 6961d7c..04fc5d4 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations.py
@@ -15,8 +15,10 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 

-
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.worker import bundle_processor, operation_specs
+from apache_beam.utils import proto_utils
 
 from pyflink.fn_execution import flink_fn_execution_pb2
 from pyflink.fn_execution.coders import from_proto, from_type_info_proto, 
TimeWindowCoder, \
@@ -113,25 +115,29 @@ def create_pandas_over_window_aggregate_function(
 table_operations.PandasBatchOverWindowAggregateFunctionOperation)
 
 
-@bundle_processor.BeamTransformFactory.register_urn(
-datastream_operations.DATA_STREAM_STATELESS_FUNCTION_URN,
-flink_fn_execution_pb2.UserDefinedDataStreamFunction)
-def create_data_stream_function(factory, transform_id, transform_proto, 
parameter, consumers):
-return _create_user_defined_function_operation(
-factory, transform_proto, consumers, parameter,
-beam_operations.StatelessFunctionOperation,
-datastream_operations.StatelessOperation)
+# - DataStream 
 
 
 @bundle_processor.BeamTransformFactory.register_urn(
-datastream_operations.DATA_STREAM_STATEFUL_FUNCTION_URN,
-flink_fn_execution_pb2.UserDefinedDataStreamFunction)
+common_urns.primitives.PAR_DO.urn, beam_runner_api_pb2.ParDoPayload)
 def create_data_stream_keyed_process_function(factory, transform_id, 
transform_proto, parameter,
   consumers):
-return _create_user_defined_function_operation(
-factory, transform_proto, consumers, parameter,
-beam_operations.StatefulFunctionOperation,
-datastream_operations.StatefulOperation)
+urn = parameter.do_fn.urn
+payload = proto_utils.parse_Bytes(
+parameter.do_fn.payload, 
flink_fn_execution_pb2.UserDefinedDataStreamFunction)
+if urn == datastream_operations.DATA_STREAM_STATELESS_FUNCTION_URN:
+return _create_user_defined_function_operation(
+factory, transform_proto, consumers, payload,
+beam_operations.StatelessFunctionOperation,
+datastream_operations.StatelessOperation)
+else:
+return _create_user_defined_function_operation(
+factory, transform_proto, consumers, payload,
+beam_operations.StatefulFunctionOperation,
+datastream_operations.StatefulOperation)
+
+
+# - Utilities 
 
 
 def _create_user_defined_function_operation(factory, transform_proto, 
consumers, udfs_proto,
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
index a9f5b91..defb696 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.BeamUrns;
 
 import javax.annotation.Nullable;
 
@@ -83,11 +84,10 @@ public class BeamDataStreamPythonFunctionRunner extends 
BeamPythonFunctionRunner
 
 @Override
 protected Map getTransforms() {
-return Collections.singletonMap(
-TRANSFORM_ID,
-RunnerApi.PTransform.newBuilder()
-.setUniqueName(TRANSFORM_ID)
-.setSpec(
+// Use ParDoPayload as a wrapper of the actual payload as timer is 
only supported in ParDo
+RunnerApi.ParDoPayload payload =
+

[flink] branch master updated (b34ef1e -> be82c54)

2021-07-21 Thread sewen
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b34ef1e  [FLINK-18783] Load Akka with separate classloader
 add cc5c9c0  [hotfix][tests] Fix raw generics in 
OperatorEventSendingCheckpointITCase
 add 798003c  [hotfix][coordination] Reduce log level for suppressed 
failures from WARN to DEBUG.
 add be82c54  [hotfix][coordination] Provide richer exception message for 
suppressed exceptions durnig coordinator failover

No new revisions were added by this update.

Summary of changes:
 .../coordination/OperatorCoordinatorHolder.java | 17 +
 .../OperatorEventSendingCheckpointITCase.java   |  9 ++---
 2 files changed, 15 insertions(+), 11 deletions(-)


[flink] branch benchmark-request updated (cffa1ae -> 36e0224)

2021-07-21 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch benchmark-request
in repository https://gitbox.apache.org/repos/asf/flink.git.


 discard cffa1ae  Bump lz4-java to 1.8.0
 add ed39fb2  [FLINK-23395] Bump Okhttp to 3.14.9
 new a2ad139  [FLINK-23329][build] Bump flink-shaded to 14.0
 new 36e0224  Adjust guava package

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (cffa1ae)
\
 N -- N -- N   refs/heads/benchmark-request (36e0224)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../application/JarManifestParserTest.java |  4 +--
 .../cassandra/CassandraInputFormatBase.java|  2 +-
 .../source/HBaseRowDataAsyncLookupFunction.java|  4 +--
 .../connector/hbase2/HBaseConnectorITCase.java |  2 +-
 .../HBaseRowDataAsyncLookupFunctionTest.java   |  2 +-
 .../hbase/source/HBaseRowDataLookupFunction.java   |  4 +--
 .../connectors/hive/FlinkEmbeddedHiveRunner.java   |  2 +-
 .../connectors/hive/FlinkStandaloneHiveRunner.java |  4 +--
 .../connector/jdbc/table/JdbcLookupFunction.java   |  4 +--
 .../jdbc/table/JdbcRowDataLookupFunction.java  |  4 +--
 .../connectors/kafka/FlinkKafkaProducer.java   |  2 +-
 .../internals/FlinkKafkaInternalProducer.java  |  2 +-
 .../kafka/FlinkKafkaInternalProducerITCase.java|  2 +-
 .../connectors/kafka/KafkaConsumerTestBase.java|  2 +-
 .../kafka/shuffle/KafkaShuffleITCase.java  |  4 +--
 .../kinesis/testutils/KinesisPubsubClient.java |  2 +-
 ...inesisDataFetcherForShardConsumerException.java |  2 +-
 .../GuavaFlinkConnectorRateLimiter.java|  2 +-
 .../apache/flink/configuration/CoreOptions.java|  4 +--
 .../core/classloading/ComponentClassLoader.java|  2 +-
 .../java/org/apache/flink/core/fs/FileSystem.java  |  8 +++---
 .../flink/core/plugin/DefaultPluginManager.java|  2 +-
 .../main/java/org/apache/flink/util/FileUtils.java |  4 +--
 .../flink/configuration/ConfigOptionTest.java  |  2 +-
 .../apache/flink/core/plugin/PluginConfigTest.java |  2 +-
 flink-dist/pom.xml |  2 +-
 flink-dist/src/main/resources/META-INF/NOTICE  |  2 +-
 .../org/apache/flink/dist/BashJavaUtilsITCase.java |  2 +-
 .../flink-end-to-end-tests-common/pom.xml  |  1 -
 .../flink-glue-schema-registry-test/pom.xml|  2 ++
 .../flink-metrics-reporter-prometheus-test/pom.xml |  1 -
 .../tests/queryablestate/QsStateProducer.java  |  2 +-
 .../kinesis/test/KinesisTableApiITCase.java|  2 +-
 .../flink-avro-glue-schema-registry/pom.xml|  2 ++
 flink-kubernetes/pom.xml   | 18 +---
 .../decorators/FlinkConfMountDecorator.java|  2 +-
 .../decorators/KerberosMountDecorator.java |  2 +-
 .../decorators/PodTemplateMountDecorator.java  |  2 +-
 .../src/main/resources/META-INF/NOTICE |  6 ++--
 .../flink/kubernetes/KubernetesTestUtils.java  |  2 +-
 .../flink/cep/nfa/sharedbuffer/SharedBuffer.java   |  2 +-
 .../apache/flink/cep/nfa/AfterMatchSkipITCase.java |  2 +-
 .../org/apache/flink/cep/nfa/GreedyITCase.java |  2 +-
 .../java/org/apache/flink/cep/nfa/GroupITCase.java |  2 +-
 .../flink/cep/nfa/IterativeConditionsITCase.java   |  2 +-
 .../java/org/apache/flink/cep/nfa/NFAITCase.java   |  2 +-
 .../org/apache/flink/cep/nfa/NotPatternITCase.java |  2 +-
 .../apache/flink/cep/nfa/SameElementITCase.java|  4 +--
 .../apache/flink/cep/nfa/TimesOrMoreITCase.java|  2 +-
 .../org/apache/flink/cep/nfa/TimesRangeITCase.java |  2 +-
 .../apache/flink/cep/nfa/UntilConditionITCase.java |  2 +-
 .../flink/cep/nfa/compiler/NFACompilerTest.java|  2 +-
 .../apache/flink/cep/operator/CEPOperatorTest.java |  2 +-
 .../apache/flink/cep/utils/TestSharedBuffer.java   |  4 +--
 .../state/api/runtime/OperatorIDGenerator.java |  2 +-
 flink-metrics/flink-metrics-datadog/pom.xml| 10 ++-
 .../src/main/resources/META-INF/NOTICE |  4 +--
 .../src/main/resources/META-INF/NOTICE |  4 +--
 .../flink/optimizer/dag/SingleInputNode.java   |  2 +-
 .../apache/flink/optimizer/dag/TwoInputNode.java   |  2 +-
 

[flink] 01/02: [FLINK-23329][build] Bump flink-shaded to 14.0

2021-07-21 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch benchmark-request
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2ad139d0bf0dd62a57b8563888e73f794e0b1f2
Author: Chesnay Schepler 
AuthorDate: Wed Jul 7 12:05:53 2021 +0200

[FLINK-23329][build] Bump flink-shaded to 14.0
---
 flink-dist/pom.xml |  2 +-
 .../java/org/apache/flink/cep/utils/TestSharedBuffer.java  |  4 +---
 .../flink/yarn/YARNSessionCapacitySchedulerITCase.java |  2 +-
 pom.xml| 14 +++---
 4 files changed, 10 insertions(+), 12 deletions(-)

diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 0355f27..b88b92b 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -34,7 +34,7 @@ under the License.
jar
 

-   3.5.6
+   3.5.9

 

diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
index e133c44..8a5df5b 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -32,8 +32,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
-
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -207,7 +205,7 @@ public class TestSharedBuffer extends SharedBuffer {
 @Override
 public Iterator> iterator() throws Exception 
{
 if (values == null) {
-return Iterators.emptyIterator();
+return Collections.emptyIterator();
 }
 
 return new 
CountingIterator<>(values.entrySet().iterator());
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index e5ae45d..2f3e5dd 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -322,7 +322,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
 try {
 final String logs = outContent.toString();
 final HostAndPort hostAndPort = 
parseJobManagerHostname(logs);
-final String host = hostAndPort.getHostText();
+final String host = hostAndPort.getHost();
 final int port = hostAndPort.getPort();
 LOG.info("Extracted hostname:port: {}:{}", host, port);
 
diff --git a/pom.xml b/pom.xml
index c2dbb32..0068d38 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,7 +100,8 @@ under the License.
 to avoid process kills due to container limits on 
TravisCI -->

${flink.forkCount}
true
-   13.0
+   14.0
+   
2.12.4
18.0
2.5.21
1.8
@@ -120,7 +121,6 @@ under the License.
3.4.14

2.12.0
-   2.12.1
0.8.1
1.10.0

1.2.0
@@ -281,31 +281,31 @@ under the License.

org.apache.flink
flink-shaded-guava
-   18.0-${flink.shaded.version}
+   
30.1.1-jre-${flink.shaded.version}

 

org.apache.flink
flink-shaded-jackson
-   
${jackson.version}-${flink.shaded.version}
+   
${flink.shaded.jackson.version}-${flink.shaded.version}

 

org.apache.flink

flink-shaded-jackson-module-jsonSchema
-   
${jackson.version}-${flink.shaded.version}
+   
${flink.shaded.jackson.version}-${flink.shaded.version}

 

org.apache.flink
flink-shaded-netty
-   
4.1.49.Final-${flink.shaded.version}
+   

[flink] branch master updated (f9286a5 -> b34ef1e)

2021-07-21 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f9286a5  [FLINK-23429][state-processor-api] Use Path instead of 
Path.getPath() to preserve FileSystem info
 add b34ef1e  [FLINK-18783] Load Akka with separate classloader

No new revisions were added by this update.

Summary of changes:
 .../resource-providers/standalone/kubernetes.md|   4 +-
 .../resource-providers/standalone/kubernetes.md|   4 +-
 flink-clients/pom.xml  |   6 +-
 flink-connectors/flink-connector-base/pom.xml  |   2 +-
 flink-connectors/flink-connector-cassandra/pom.xml |   2 +-
 .../flink-connector-elasticsearch-base/pom.xml |   2 +-
 .../flink-connector-elasticsearch5/pom.xml |   2 +-
 .../flink-connector-elasticsearch6/pom.xml |   2 +-
 .../flink-connector-elasticsearch7/pom.xml |   2 +-
 flink-connectors/flink-connector-files/pom.xml |   2 +-
 .../flink-connector-gcp-pubsub/pom.xml |   2 +-
 flink-connectors/flink-connector-hbase-1.4/pom.xml |   2 +-
 flink-connectors/flink-connector-hive/pom.xml  |   2 +-
 flink-connectors/flink-connector-jdbc/pom.xml  |   2 +-
 flink-connectors/flink-connector-kafka/pom.xml |   4 +-
 flink-connectors/flink-connector-kinesis/pom.xml   |   4 +-
 flink-connectors/flink-connector-rabbitmq/pom.xml  |   4 +-
 flink-container/pom.xml|   2 +-
 .../apache/flink/configuration/CoreOptions.java|   3 +-
 .../core/classloading/SubmoduleClassLoader.java|  45 +++
 .../apache/flink/util/concurrent/FutureUtils.java  |  26 +-
 flink-dist/pom.xml |  11 +-
 flink-dist/src/main/assemblies/opt.xml |   4 +-
 .../src/main/flink-bin/conf/log4j-cli.properties   |   2 +-
 .../main/flink-bin/conf/log4j-console.properties   |   2 +-
 .../main/flink-bin/conf/log4j-session.properties   |   2 +-
 .../src/main/flink-bin/conf/log4j.properties   |   2 +-
 .../src/main/flink-bin/conf/logback-console.xml|   2 +-
 flink-dist/src/main/flink-bin/conf/logback.xml |   2 +-
 flink-dist/src/main/resources/META-INF/NOTICE  |  12 -
 flink-docs/pom.xml |   4 +-
 .../pom.xml|   2 +-
 .../flink-end-to-end-tests-common/pom.xml  |   2 +-
 .../flink-metrics-availability-test/pom.xml|   2 +-
 flink-end-to-end-tests/test-scripts/common.sh  |   4 +-
 flink-examples/flink-examples-streaming/pom.xml|   2 +-
 flink-examples/flink-examples-table/pom.xml|   2 +-
 flink-formats/flink-avro/pom.xml   |   2 +-
 flink-formats/flink-compress/pom.xml   |   2 +-
 flink-formats/flink-csv/pom.xml|   2 +-
 flink-formats/flink-hadoop-bulk/pom.xml|   2 +-
 flink-formats/flink-json/pom.xml   |   2 +-
 flink-formats/flink-orc/pom.xml|   2 +-
 flink-formats/flink-parquet/pom.xml|   2 +-
 flink-formats/flink-sequence-file/pom.xml  |   2 +-
 flink-fs-tests/pom.xml |   4 +-
 flink-kubernetes/pom.xml   |   6 +-
 flink-libraries/flink-cep/pom.xml  |   4 +-
 flink-libraries/flink-gelly/pom.xml|   2 +-
 flink-libraries/flink-state-processing-api/pom.xml |   4 +-
 flink-metrics/flink-metrics-influxdb/pom.xml   |   2 +-
 flink-metrics/flink-metrics-jmx/pom.xml|   2 +-
 flink-metrics/flink-metrics-prometheus/pom.xml |   4 +-
 flink-optimizer/pom.xml|   6 +-
 flink-python/pom.xml   |   2 +-
 .../flink-queryable-state-runtime/pom.xml  |   8 +-
 flink-rpc/flink-rpc-akka-loader/pom.xml| 127 +++
 .../runtime/rpc/akka/AkkaRpcSystemLoader.java  |  78 +
 .../runtime/rpc/akka/CleanupOnCloseRpcSystem.java} |  61 +++-
 .../org.apache.flink.runtime.rpc.RpcSystemLoader   |   2 +-
 flink-rpc/flink-rpc-akka/pom.xml   | 176 ++
 .../akka/ActorSystemScheduledExecutorAdapter.java  |  22 +-
 .../runtime/concurrent/akka/ClassLoadingUtils.java | 119 +++
 .../runtime/rpc/akka/AkkaInvocationHandler.java|  12 +-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java   |  51 ++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java | 100 --
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java  |  64 ++--
 .../rpc/akka/FencedAkkaInvocationHandler.java  |   6 +-
 .../flink/runtime/rpc/akka/FencedAkkaRpcActor.java |   5 +-
 .../src/main/resources/META-INF/NOTICE |  31 +-
 .../apache/flink/runtime/rpc/akka/AkkaUtils.scala  |  16 +-
 .../runtime/rpc/akka/CustomSSLEngineProvider.scala |   3 +-
 .../concurrent/akka/ClassLoadingUtilsTest.java | 105 ++
 .../rpc/akka/ContextClassLoadingSettingTest.java   | 381 +
 

[flink] branch release-1.12 updated (600ce81 -> 909a6a1)

2021-07-21 Thread sjwiesman
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 600ce81  [FLINK-23419][streaming] Fixed the condition in 
testNextFirstCheckpointBarrierOvertakesCancellationBarrier in order to make the 
test more stable (#16534)
 add 909a6a1  [FLINK-23429][state-processor-api] Use Path instead of 
Path.getPath() to preserve FileSystem info

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/state/api/output/FileCopyFunction.java  |  5 ++---
 .../apache/flink/state/api/output/StatePathExtractor.java| 12 ++--
 2 files changed, 8 insertions(+), 9 deletions(-)


[flink-benchmarks] branch master updated (12b87f8 -> e58adbf)

2021-07-21 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git.


from 12b87f8  [FLINK-23208] Add a benchmark for processing timers
 add e58adbf  [hotfix] Improve README

No new revisions were added by this update.

Summary of changes:
 README.md | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)


[flink] branch release-1.13 updated: [FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to preserve FileSystem info

2021-07-21 Thread sjwiesman
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
 new dd2945d  [FLINK-23429][state-processor-api] Use Path instead of 
Path.getPath() to preserve FileSystem info
dd2945d is described below

commit dd2945d73849a3bedf4f390a5ea8541f9815ce6e
Author: Jun Qin <11677043+qinjunje...@users.noreply.github.com>
AuthorDate: Wed Jul 21 08:54:52 2021 +0200

[FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to 
preserve FileSystem info

This closes #16550
---
 .../org/apache/flink/state/api/output/FileCopyFunction.java  |  5 ++---
 .../apache/flink/state/api/output/StatePathExtractor.java| 12 ++--
 2 files changed, 8 insertions(+), 9 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
index 9b15222..f761061 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
@@ -32,7 +32,7 @@ import java.io.IOException;
 
 /** This output format copies files from an existing savepoint into a new 
directory. */
 @Internal
-public final class FileCopyFunction implements OutputFormat {
+public final class FileCopyFunction implements OutputFormat {
 
 private static final long serialVersionUID = 1L;
 
@@ -52,8 +52,7 @@ public final class FileCopyFunction implements 
OutputFormat {
 }
 
 @Override
-public void writeRecord(String record) throws IOException {
-Path sourcePath = new Path(record);
+public void writeRecord(Path sourcePath) throws IOException {
 Path destPath = new Path(path, sourcePath.getName());
 try (FSDataOutputStream os =
 destPath.getFileSystem()
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
index b7c260b..b9107a5 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
@@ -34,18 +34,18 @@ import javax.annotation.Nullable;
 
 /** Extracts all file paths that are part of the provided {@link 
OperatorState}. */
 @Internal
-public class StatePathExtractor implements FlatMapFunction {
+public class StatePathExtractor implements FlatMapFunction {
 
 private static final long serialVersionUID = 1L;
 
 @Override
-public void flatMap(OperatorState operatorState, Collector out) 
throws Exception {
+public void flatMap(OperatorState operatorState, Collector out) 
throws Exception {
 for (OperatorSubtaskState subTaskState : 
operatorState.getSubtaskStates().values()) {
 // managed operator state
 for (OperatorStateHandle operatorStateHandle : 
subTaskState.getManagedOperatorState()) {
 Path path = 
getStateFilePathFromStreamStateHandle(operatorStateHandle);
 if (path != null) {
-out.collect(path.getPath());
+out.collect(path);
 }
 }
 // managed keyed state
@@ -55,7 +55,7 @@ public class StatePathExtractor implements 
FlatMapFunction

[flink] branch master updated: [FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to preserve FileSystem info

2021-07-21 Thread sjwiesman
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f9286a5  [FLINK-23429][state-processor-api] Use Path instead of 
Path.getPath() to preserve FileSystem info
f9286a5 is described below

commit f9286a54ded7c34d7bf49146a93ee86cc701ac9b
Author: Jun Qin <11677043+qinjunje...@users.noreply.github.com>
AuthorDate: Tue Jul 20 10:42:51 2021 +0200

[FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to 
preserve FileSystem info

This closes #16542
---
 .../org/apache/flink/state/api/output/FileCopyFunction.java  |  5 ++---
 .../apache/flink/state/api/output/StatePathExtractor.java| 12 ++--
 2 files changed, 8 insertions(+), 9 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
index 9b15222..f761061 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
@@ -32,7 +32,7 @@ import java.io.IOException;
 
 /** This output format copies files from an existing savepoint into a new 
directory. */
 @Internal
-public final class FileCopyFunction implements OutputFormat {
+public final class FileCopyFunction implements OutputFormat {
 
 private static final long serialVersionUID = 1L;
 
@@ -52,8 +52,7 @@ public final class FileCopyFunction implements 
OutputFormat {
 }
 
 @Override
-public void writeRecord(String record) throws IOException {
-Path sourcePath = new Path(record);
+public void writeRecord(Path sourcePath) throws IOException {
 Path destPath = new Path(path, sourcePath.getName());
 try (FSDataOutputStream os =
 destPath.getFileSystem()
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
index b7c260b..b9107a5 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
@@ -34,18 +34,18 @@ import javax.annotation.Nullable;
 
 /** Extracts all file paths that are part of the provided {@link 
OperatorState}. */
 @Internal
-public class StatePathExtractor implements FlatMapFunction {
+public class StatePathExtractor implements FlatMapFunction {
 
 private static final long serialVersionUID = 1L;
 
 @Override
-public void flatMap(OperatorState operatorState, Collector out) 
throws Exception {
+public void flatMap(OperatorState operatorState, Collector out) 
throws Exception {
 for (OperatorSubtaskState subTaskState : 
operatorState.getSubtaskStates().values()) {
 // managed operator state
 for (OperatorStateHandle operatorStateHandle : 
subTaskState.getManagedOperatorState()) {
 Path path = 
getStateFilePathFromStreamStateHandle(operatorStateHandle);
 if (path != null) {
-out.collect(path.getPath());
+out.collect(path);
 }
 }
 // managed keyed state
@@ -55,7 +55,7 @@ public class StatePathExtractor implements 
FlatMapFunction

[flink] branch master updated (3137184 -> 7083bb7)

2021-07-21 Thread sjwiesman
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 3137184  [FLINK-23402][streaming-java] Mark GlobalStreamExchangeMode 
as @Internal
 add 7083bb7  [hotfix][docs] fix image rendering on local installation

No new revisions were added by this update.

Summary of changes:
 docs/content/docs/try-flink/local_installation.md | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)


[flink] 02/04: [FLINK-23402][streaming-java] Fix minor code issues around 'shuffle mode'

2021-07-21 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 86f54c89c7866647e50d3957026bd0d28869ea8d
Author: Timo Walther 
AuthorDate: Mon Jul 19 15:02:38 2021 +0200

[FLINK-23402][streaming-java] Fix minor code issues around 'shuffle mode'
---
 .../flink/streaming/api/graph/StreamEdge.java  |  2 ++
 .../transformations/PartitionTransformation.java   | 20 ++--
 .../api/transformations/StreamExchangeMode.java|  9 +++---
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 36 +++---
 ...aphGeneratorWithGlobalDataExchangeModeTest.java |  2 +-
 .../plan/nodes/exec/batch/BatchExecExchange.java   |  2 +-
 .../utils/InputPriorityConflictResolver.java   |  2 +-
 .../planner/utils/StreamExchangeModeUtils.java |  2 +-
 8 files changed, 39 insertions(+), 36 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 5ca627c..84c4110 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -194,6 +194,8 @@ public class StreamEdge implements Serializable {
 + typeNumber
 + ", outputPartitioner="
 + outputPartitioner
++ ", exchangeMode="
++ exchangeMode
 + ", bufferTimeout="
 + bufferTimeout
 + ", outputTag="
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
index 2521539..bc334d1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
@@ -35,7 +35,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * This does not create a physical operation, it only affects how upstream 
operations are
  * connected to downstream operations.
  *
- * @param  The type of the elements that result from this {@code 
PartitionTransformation}
+ * @param  The type of the elements that result from this {@link 
PartitionTransformation}
  */
 @Internal
 public class PartitionTransformation extends Transformation {
@@ -47,23 +47,23 @@ public class PartitionTransformation extends 
Transformation {
 private final StreamExchangeMode exchangeMode;
 
 /**
- * Creates a new {@code PartitionTransformation} from the given input and 
{@link
+ * Creates a new {@link PartitionTransformation} from the given input and 
{@link
  * StreamPartitioner}.
  *
- * @param input The input {@code Transformation}
- * @param partitioner The {@code StreamPartitioner}
+ * @param input The input {@link Transformation}
+ * @param partitioner The {@link StreamPartitioner}
  */
 public PartitionTransformation(Transformation input, 
StreamPartitioner partitioner) {
 this(input, partitioner, StreamExchangeMode.UNDEFINED);
 }
 
 /**
- * Creates a new {@code PartitionTransformation} from the given input and 
{@link
+ * Creates a new {@link PartitionTransformation} from the given input and 
{@link
  * StreamPartitioner}.
  *
- * @param input The input {@code Transformation}
- * @param partitioner The {@code StreamPartitioner}
- * @param exchangeMode The {@code ShuffleMode}
+ * @param input The input {@link Transformation}
+ * @param partitioner The {@link StreamPartitioner}
+ * @param exchangeMode The {@link StreamExchangeMode}
  */
 public PartitionTransformation(
 Transformation input,
@@ -76,8 +76,8 @@ public class PartitionTransformation extends 
Transformation {
 }
 
 /**
- * Returns the {@code StreamPartitioner} that must be used for 
partitioning the elements of the
- * input {@code Transformation}.
+ * Returns the {@link StreamPartitioner} that must be used for 
partitioning the elements of the
+ * input {@link Transformation}.
  */
 public StreamPartitioner getPartitioner() {
 return partitioner;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
index 4637c07..5752add 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
+++ 

[flink] branch master updated (9e0e092 -> 3137184)

2021-07-21 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 9e0e092  [FLINK-23407][docs] Use DescribedEnum for existing config 
option enums
 new 4e65322  [FLINK-23402][streaming-java] Refactor ShuffleMode to 
StreamExchangeMode
 new 86f54c8  [FLINK-23402][streaming-java] Fix minor code issues around 
'shuffle mode'
 new 156f517  [FLINK-23402][streaming-java] Refactor GlobalDataExchangeMode 
to GlobalStreamExchangeMode
 new 3137184  [FLINK-23402][streaming-java] Mark GlobalStreamExchangeMode 
as @Internal

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/table/tpcds/TpcdsTestProgram.java |  4 +-
 .../environment/StreamExecutionEnvironment.java|  5 +-
 ...angeMode.java => GlobalStreamExchangeMode.java} |  8 ++--
 .../flink/streaming/api/graph/StreamEdge.java  | 20 
 .../flink/streaming/api/graph/StreamGraph.java | 33 ++---
 .../streaming/api/graph/StreamGraphGenerator.java  |  8 ++--
 .../api/graph/StreamingJobGraphGenerator.java  | 12 ++---
 .../transformations/PartitionTransformation.java   | 36 +++---
 .../{ShuffleMode.java => StreamExchangeMode.java}  | 15 +++---
 .../PartitionTransformationTranslator.java |  2 +-
 ...amGraphGeneratorExecutionModeDetectionTest.java | 25 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 54 ++---
 ...GeneratorWithGlobalStreamExchangeModeTest.java} | 30 ++--
 .../table/planner/plan/nodes/exec/ExecEdge.java| 31 ++--
 .../plan/nodes/exec/batch/BatchExecExchange.java   | 38 +++
 .../exec/processor/DeadlockBreakupProcessor.java   |  4 +-
 .../MultipleInputNodeCreationProcessor.java|  4 +-
 .../utils/InputPriorityConflictResolver.java   | 21 
 .../exec/serde/ExecNodeGraphJsonPlanGenerator.java | 18 +++
 .../flink/table/planner/utils/ExecutorUtils.java   |  8 ++--
 ...ModeUtils.java => StreamExchangeModeUtils.java} | 14 +++---
 .../physical/batch/BatchPhysicalExchange.scala | 16 +++
 .../utils/InputPriorityConflictResolverTest.java   | 19 
 ...sTest.java => StreamExchangeModeUtilsTest.java} | 56 +++---
 .../planner/runtime/utils/BatchTestBase.scala  |  4 +-
 .../flink/table/planner/utils/TableTestBase.scala  |  4 +-
 .../flink/test/runtime/BlockingShuffleITCase.java  |  4 +-
 27 files changed, 255 insertions(+), 238 deletions(-)
 rename 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/{GlobalDataExchangeMode.java
 => GlobalStreamExchangeMode.java} (89%)
 rename 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/{ShuffleMode.java
 => StreamExchangeMode.java} (72%)
 rename 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/{StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java
 => StreamingJobGraphGeneratorWithGlobalStreamExchangeModeTest.java} (87%)
 rename 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/{ShuffleModeUtils.java
 => StreamExchangeModeUtils.java} (79%)
 rename 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/{ShuffleModeUtilsTest.java
 => StreamExchangeModeUtilsTest.java} (57%)


[flink] 03/04: [FLINK-23402][streaming-java] Refactor GlobalDataExchangeMode to GlobalStreamExchangeMode

2021-07-21 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 156f517d387202ac292bde5bfac423a23908b7a2
Author: Timo Walther 
AuthorDate: Mon Jul 19 16:14:51 2021 +0200

[FLINK-23402][streaming-java] Refactor GlobalDataExchangeMode to 
GlobalStreamExchangeMode
---
 .../apache/flink/table/tpcds/TpcdsTestProgram.java |  4 +-
 .../environment/StreamExecutionEnvironment.java|  5 ++-
 ...angeMode.java => GlobalStreamExchangeMode.java} |  2 +-
 .../flink/streaming/api/graph/StreamGraph.java | 10 ++---
 .../streaming/api/graph/StreamGraphGenerator.java  |  8 ++--
 .../api/graph/StreamingJobGraphGenerator.java  |  4 +-
 ...amGraphGeneratorExecutionModeDetectionTest.java | 25 ++-
 ...GeneratorWithGlobalStreamExchangeModeTest.java} | 22 +-
 .../plan/nodes/exec/batch/BatchExecExchange.java   |  4 +-
 .../flink/table/planner/utils/ExecutorUtils.java   |  8 ++--
 .../planner/utils/StreamExchangeModeUtils.java | 10 ++---
 .../physical/batch/BatchPhysicalExchange.scala |  4 +-
 .../planner/utils/StreamExchangeModeUtilsTest.java | 48 +++---
 .../planner/runtime/utils/BatchTestBase.scala  |  4 +-
 .../flink/table/planner/utils/TableTestBase.scala  |  4 +-
 .../flink/test/runtime/BlockingShuffleITCase.java  |  4 +-
 16 files changed, 85 insertions(+), 81 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java
 
b/flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java
index 1ade3bb..3f46dc2 100644
--- 
a/flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.tpcds;
 
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
+import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
@@ -145,7 +145,7 @@ public class TpcdsTestProgram {
 .getConfiguration()
 .setString(
 ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
-
GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED.toString());
+
GlobalStreamExchangeMode.POINTWISE_EDGES_PIPELINED.toString());
 tEnv.getConfig()
 .getConfiguration()
 .setLong(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 9670b39..8b0dd6a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -93,7 +93,7 @@ import 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
-import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
+import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
@@ -2110,7 +2110,8 @@ public class StreamExecutionEnvironment {
 if (configuration.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.BATCH
 && streamGraph.hasFineGrainedResource()) {
 if 
(configuration.get(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING)) {
-
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+streamGraph.setGlobalStreamExchangeMode(
+GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
 } else {
 throw new IllegalConfigurationException(
 "At the moment, fine-grained resource management 
requires batch workloads to "
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode.java
similarity index 98%
rename from 

[flink] 04/04: [FLINK-23402][streaming-java] Mark GlobalStreamExchangeMode as @Internal

2021-07-21 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 313718466d15b473bd5bf1dcf0d9d988e0fd5979
Author: Timo Walther 
AuthorDate: Mon Jul 19 16:20:03 2021 +0200

[FLINK-23402][streaming-java] Mark GlobalStreamExchangeMode as @Internal

This closes #16536.
---
 .../org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode.java  | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode.java
index 2befa4b..ee783e3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.graph;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
@@ -27,6 +28,7 @@ import 
org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
  * This mode decides the default {@link ResultPartitionType} of job edges. 
Note that this only
  * affects job edges which are {@link StreamExchangeMode#UNDEFINED}.
  */
+@Internal
 public enum GlobalStreamExchangeMode {
 /** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */
 ALL_EDGES_BLOCKING,


[flink] 01/04: [FLINK-23402][streaming-java] Refactor ShuffleMode to StreamExchangeMode

2021-07-21 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4e65322dc1b5f80a7f3a42f0f205f978357daa40
Author: Timo Walther 
AuthorDate: Mon Jul 19 14:22:18 2021 +0200

[FLINK-23402][streaming-java] Refactor ShuffleMode to StreamExchangeMode
---
 .../api/graph/GlobalDataExchangeMode.java  |  4 +--
 .../flink/streaming/api/graph/StreamEdge.java  | 18 ++--
 .../flink/streaming/api/graph/StreamGraph.java | 23 
 .../api/graph/StreamingJobGraphGenerator.java  |  8 +++---
 .../transformations/PartitionTransformation.java   | 18 ++--
 .../{ShuffleMode.java => StreamExchangeMode.java}  |  6 ++--
 .../PartitionTransformationTranslator.java |  2 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 30 ++--
 ...aphGeneratorWithGlobalDataExchangeModeTest.java |  8 +++---
 .../table/planner/plan/nodes/exec/ExecEdge.java| 31 +++--
 .../plan/nodes/exec/batch/BatchExecExchange.java   | 32 +++---
 .../exec/processor/DeadlockBreakupProcessor.java   |  4 +--
 .../MultipleInputNodeCreationProcessor.java|  4 +--
 .../utils/InputPriorityConflictResolver.java   | 21 +++---
 .../exec/serde/ExecNodeGraphJsonPlanGenerator.java | 18 ++--
 .../flink/table/planner/utils/ExecutorUtils.java   |  2 +-
 ...ModeUtils.java => StreamExchangeModeUtils.java} |  2 +-
 .../physical/batch/BatchPhysicalExchange.scala | 12 
 .../utils/InputPriorityConflictResolverTest.java   | 19 +++--
 ...sTest.java => StreamExchangeModeUtilsTest.java} | 28 +--
 20 files changed, 149 insertions(+), 141 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
index b2b5f21..3118f7f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
@@ -19,13 +19,13 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 
 /**
  * This mode decides the default {@link ResultPartitionType} of job edges. 
Note that this only
- * affects job edges which are {@link ShuffleMode#UNDEFINED}.
+ * affects job edges which are {@link StreamExchangeMode#UNDEFINED}.
  */
 public enum GlobalDataExchangeMode {
 /** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index f76edd5..5ca627c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.util.OutputTag;
 
@@ -58,7 +58,7 @@ public class StreamEdge implements Serializable {
 /** The name of the operator in the target vertex. */
 private final String targetOperatorName;
 
-private final ShuffleMode shuffleMode;
+private final StreamExchangeMode exchangeMode;
 
 private long bufferTimeout;
 
@@ -78,7 +78,7 @@ public class StreamEdge implements Serializable {
 ALWAYS_FLUSH_BUFFER_TIMEOUT,
 outputPartitioner,
 outputTag,
-ShuffleMode.UNDEFINED);
+StreamExchangeMode.UNDEFINED);
 }
 
 public StreamEdge(
@@ -87,7 +87,7 @@ public class StreamEdge implements Serializable {
 int typeNumber,
 StreamPartitioner outputPartitioner,
 OutputTag outputTag,
-ShuffleMode shuffleMode) {
+StreamExchangeMode exchangeMode) {
 
 this(
 sourceVertex,
@@ -96,7 +96,7 @@ public class StreamEdge implements Serializable {
 sourceVertex.getBufferTimeout(),
 outputPartitioner,
 outputTag,
-shuffleMode);
+exchangeMode);
 }

[flink] branch master updated (3d1c11e -> 9e0e092)

2021-07-21 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 3d1c11e  [FLINK-20562][sql-client][table] Support explain details for 
EXPLAIN statement
 add cdf6174  [FLINK-23407][docs][core] Documented enums for config options
 add 9e0e092  [FLINK-23407][docs] Use DescribedEnum for existing config 
option enums

No new revisions were added by this update.

Summary of changes:
 .../generated/cluster_configuration.html   |  8 +--
 .../execution_checkpointing_configuration.html |  8 +--
 .../generated/execution_config_configuration.html  |  8 +--
 .../generated/execution_configuration.html |  4 +-
 .../generated/expert_cluster_section.html  |  8 +--
 .../generated/expert_scheduling_section.html   |  4 +-
 .../generated/influxdb_reporter_configuration.html |  8 +--
 .../generated/job_manager_configuration.html   |  4 +-
 .../generated/kubernetes_config_configuration.html |  8 +--
 .../generated/pipeline_configuration.html  |  4 +-
 .../rocksdb_configurable_configuration.html|  8 +--
 .../generated/rocksdb_configuration.html   |  4 +-
 .../generated/sql_client_configuration.html|  4 +-
 .../generated/state_backend_rocksdb_section.html   |  4 +-
 .../generated/stream_pipeline_configuration.html   |  4 +-
 .../generated/yarn_config_configuration.html   |  4 +-
 .../apache/flink/api/common/ExecutionConfig.java   | 25 +---
 .../apache/flink/configuration/ClusterOptions.java | 50 ++-
 .../{WritableConfig.java => DescribedEnum.java}| 26 +---
 .../flink/configuration/PipelineOptions.java   | 16 +
 .../configuration/description/TextElement.java |  7 +++
 .../configuration/ConfigOptionsDocGenerator.java   | 72 +++---
 .../ConfigOptionsDocGeneratorTest.java | 39 ++--
 .../ConfigOptionsDocsCompletenessITCase.java   |  9 +--
 .../configuration/KubernetesConfigOptions.java |  4 +-
 .../state/EmbeddedRocksDBStateBackend.java | 37 ---
 .../contrib/streaming/state/RocksDBOptions.java| 18 +++---
 .../flink/table/client/config/ResultMode.java  | 28 ++---
 .../table/client/config/SqlClientOptions.java  |  6 +-
 .../test/checkpointing/TimersSavepointITCase.java  |  5 +-
 .../flink/test/state/BackendSwitchSpecs.java   |  7 ++-
 .../yarn/configuration/YarnConfigOptions.java  | 34 +-
 32 files changed, 295 insertions(+), 180 deletions(-)
 copy 
flink-core/src/main/java/org/apache/flink/configuration/{WritableConfig.java => 
DescribedEnum.java} (50%)


[flink-benchmarks] branch master updated: [FLINK-23208] Add a benchmark for processing timers

2021-07-21 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
 new 12b87f8  [FLINK-23208] Add a benchmark for processing timers
12b87f8 is described below

commit 12b87f86903e1d2268004cd4f295ddf8c1d33f0a
Author: Jiayi Liao 
AuthorDate: Wed Jul 21 11:11:45 2021 +0800

[FLINK-23208] Add a benchmark for processing timers
---
 .../flink/benchmark/ProcessingTimerBenchmark.java  | 123 +
 1 file changed, 123 insertions(+)

diff --git 
a/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java
new file mode 100644
index 000..dee911e
--- /dev/null
+++ b/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Collector;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.Random;
+
+@OperationsPerInvocation(value = 
ProcessingTimerBenchmark.PROCESSING_TIMERS_PER_INVOCATION)
+public class ProcessingTimerBenchmark extends BenchmarkBase {
+
+public static final int PROCESSING_TIMERS_PER_INVOCATION = 1_000;
+
+private static final int PARALLELISM = 1;
+
+private static OneShotLatch LATCH = new OneShotLatch();
+
+public static void main(String[] args)
+throws RunnerException {
+Options options = new OptionsBuilder()
+.verbosity(VerboseMode.NORMAL)
+.include(".*" + 
ProcessingTimerBenchmark.class.getCanonicalName() + ".*")
+.build();
+
+new Runner(options).run();
+}
+
+@Benchmark
+public void fireProcessingTimers(FlinkEnvironmentContext context) throws 
Exception {
+LATCH.reset();
+StreamExecutionEnvironment env = context.env;
+env.setParallelism(PARALLELISM);
+
+env.addSource(new SingleRecordSource())
+.keyBy(String::hashCode)
+.process(new 
ProcessingTimerKeyedProcessFunction(PROCESSING_TIMERS_PER_INVOCATION))
+.addSink(new DiscardingSink<>());
+
+env.execute();
+}
+
+private static class SingleRecordSource extends 
RichParallelSourceFunction {
+
+private Random random;
+
+public SingleRecordSource() {}
+
+@Override
+public void open(Configuration parameters) throws Exception {
+this.random = new Random();
+}
+
+@Override
+public void run(SourceContext sourceContext) throws Exception {
+synchronized (sourceContext.getCheckpointLock()) {
+sourceContext.collect(String.valueOf(random.nextLong()));
+}
+
+LATCH.await();
+}
+
+@Override
+public void cancel() {}
+}
+
+private static class ProcessingTimerKeyedProcessFunction extends 
KeyedProcessFunction {
+
+private final long timersPerRecord;
+private long firedTimesCount;
+
+public ProcessingTimerKeyedProcessFunction(long timersPerRecord) {
+this.timersPerRecord = timersPerRecord;
+}
+
+@Override
+public void open(Configuration parameters) throws Exception {
+this.firedTimesCount = 0;
+}
+
+@Override
+public 

[flink] branch master updated: [FLINK-20562][sql-client][table] Support explain details for EXPLAIN statement

2021-07-21 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 3d1c11e  [FLINK-20562][sql-client][table] Support explain details for 
EXPLAIN statement
3d1c11e is described below

commit 3d1c11e433f28817f1bb70d5ed028c81db705f27
Author: zhaown <51357674+chao...@users.noreply.github.com>
AuthorDate: Wed Jul 21 18:41:14 2021 +0800

[FLINK-20562][sql-client][table] Support explain details for EXPLAIN 
statement

This closes #15317
---
 .../flink/table/client/cli/CliClientITCase.java|   7 +-
 .../src/test/resources/sql/table.q | 238 +
 .../src/main/codegen/data/Parser.tdd   |  10 +-
 .../src/main/codegen/includes/parserImpls.ftl  |  36 +++-
 .../flink/sql/parser/dql/SqlRichExplain.java   |  22 +-
 .../flink/sql/parser/utils/ParserResource.java |   3 +
 .../ParserResource.properties  |   1 +
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  60 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   8 +-
 .../flink/table/operations/ExplainOperation.java   |  29 ++-
 .../operations/SqlToOperationConverter.java|   7 +-
 .../operations/SqlToOperationConverterTest.java|  34 +++
 .../testExecuteSqlWithExplainDetailsAndUnion.out   |  55 +
 .../testExecuteSqlWithExplainDetailsInsert.out |  70 ++
 .../testExecuteSqlWithExplainDetailsSelect.out |  45 
 .../flink/table/api/TableEnvironmentTest.scala | 122 ---
 16 files changed, 703 insertions(+), 44 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
index be8d277..e576d09 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
@@ -302,13 +302,18 @@ public class CliClientITCase extends AbstractTestBase {
 out.append(sqlScript.comment).append(sqlScript.sql);
 if (i < results.size()) {
 Result result = results.get(i);
-
out.append(result.content).append(result.highestTag.tag).append("\n");
+String content = removeStreamNodeId(result.content);
+out.append(content).append(result.highestTag.tag).append("\n");
 }
 }
 
 return out.toString();
 }
 
+private static String removeStreamNodeId(String s) {
+return s.replaceAll("\"id\" : \\d+", "\"id\" : ");
+}
+
 private static final class Result {
 final String content;
 final Tag highestTag;
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q 
b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index 157808e..31797f3 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -447,3 +447,241 @@ Sink(table=[default_catalog.default_database.orders2], 
fields=[user, product, am
+- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts])
 
 !ok
+
+# test explain insert with json format
+explain json_execution_plan insert into orders2 select `user`, product, 
amount, ts from orders;
+== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, 
product, amount, ts])
++- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3])
+   +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL 
SECOND)])
+  +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], 
ptime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, 
orders]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, 
amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+   +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.orders2], fields=[user, product, 
amount, ts])
++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+   +- TableSourceScan(table=[[default_catalog, default_database, orders]], 
fields=[user, product, amount, ts])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+"id" : ,
+"type" : "Source: TableSourceScan(table=[[default_catalog, 
default_database, orders]], fields=[user, product, amount, ts])",
+"pact" : "Data Source",
+"contents" : "Source: TableSourceScan(table=[[default_catalog, 
default_database, 

[flink] branch release-1.12 updated (da62d0d -> 600ce81)

2021-07-21 Thread yuanmei
This is an automated email from the ASF dual-hosted git repository.

yuanmei pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


from da62d0d  [FLINK-20975][hive][tests] Allow integral partition filter 
pushdown
 add 600ce81  [FLINK-23419][streaming] Fixed the condition in 
testNextFirstCheckpointBarrierOvertakesCancellationBarrier in order to make the 
test more stable (#16534)

No new revisions were added by this update.

Summary of changes:
 .../runtime/io/CheckpointBarrierTrackerTest.java   | 30 --
 1 file changed, 22 insertions(+), 8 deletions(-)


[flink-shaded] tag release-14.0 created (now 4782c96)

2021-07-21 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to tag release-14.0
in repository https://gitbox.apache.org/repos/asf/flink-shaded.git.


  at 4782c96  (commit)
No new revisions were added by this update.


svn commit: r48919 - /dev/flink/flink-shaded-14.0-rc1/ /release/flink/flink-shaded-14.0/

2021-07-21 Thread chesnay
Author: chesnay
Date: Wed Jul 21 07:26:16 2021
New Revision: 48919

Log:
Release Flink-shaded 14.0

Added:
release/flink/flink-shaded-14.0/
  - copied from r48918, dev/flink/flink-shaded-14.0-rc1/
Removed:
dev/flink/flink-shaded-14.0-rc1/



[flink] branch master updated (ae136d1 -> ed39fb2)

2021-07-21 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ae136d1  [FLINK-23186][docs] improve installation page
 add ed39fb2  [FLINK-23395] Bump Okhttp to 3.14.9

No new revisions were added by this update.

Summary of changes:
 .../flink-end-to-end-tests-common/pom.xml  |  1 -
 .../flink-metrics-reporter-prometheus-test/pom.xml |  1 -
 flink-kubernetes/pom.xml   | 18 +++---
 flink-kubernetes/src/main/resources/META-INF/NOTICE|  6 +++---
 flink-metrics/flink-metrics-datadog/pom.xml| 10 +-
 .../src/main/resources/META-INF/NOTICE |  4 ++--
 .../src/main/resources/META-INF/NOTICE |  4 ++--
 pom.xml| 12 
 8 files changed, 35 insertions(+), 21 deletions(-)