[flink] branch master updated (073f9f3f -> 3671316)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 073f9f3f [FLINK-23455][e2e] Remove the usage of the sql-client YAML file in SQLJobSubmission add 3671316 [FLINK-23446][e2e] Refactor SQL Client end to end tests to replace YAML file with SQL DDL No new revisions were added by this update. Summary of changes: .../test-scripts/kafka_sql_common.sh | 64 +++-- .../test-scripts/test_sql_client.sh| 102 - 2 files changed, 53 insertions(+), 113 deletions(-)
[flink] branch master updated (6709573 -> 073f9f3f)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 6709573 [FLINK-23204] Provide StateBackends access to MailboxExecutor (#16531) add 073f9f3f [FLINK-23455][e2e] Remove the usage of the sql-client YAML file in SQLJobSubmission No new revisions were added by this update. Summary of changes: .../flink/tests/util/flink/FlinkContainer.java | 14 - .../flink/tests/util/flink/FlinkDistribution.java | 12 .../flink/tests/util/flink/SQLJobSubmission.java | 33 ++ 3 files changed, 2 insertions(+), 57 deletions(-)
[flink] branch master updated (a26887e -> 6709573)
This is an automated email from the ASF dual-hosted git repository. yuanmei pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from a26887e [FLINK-23434][table-planner] Fix the inconsistent type in IncrementalAggregateRule when the query has one distinct agg function and count star agg function add 6709573 [FLINK-23204] Provide StateBackends access to MailboxExecutor (#16531) No new revisions were added by this update. Summary of changes: .../flink/runtime/execution/Environment.java | 18 .../flink/runtime/mailbox}/MailboxExecutor.java| 10 +++ .../runtime/taskmanager/RuntimeEnvironment.java| 31 .../runtime/mailbox}/SyncMailboxExecutor.java | 2 +- .../operators/testutils/MockEnvironment.java | 34 ++ .../changelog/ChangelogKeyedStateBackend.java | 12 +++- .../state/changelog/ChangelogStateBackend.java | 4 ++- .../source/ContinuousFileReaderOperator.java | 2 +- .../ContinuousFileReaderOperatorFactory.java | 2 +- .../api/operators/StreamOperatorFactoryUtil.java | 1 + .../api/operators/YieldingOperatorFactory.java | 1 + .../api/operators/async/AsyncWaitOperator.java | 2 +- .../operators/async/AsyncWaitOperatorFactory.java | 2 +- .../io/checkpointing/CheckpointedInputGate.java| 2 +- .../io/checkpointing/InputProcessorUtil.java | 2 +- .../tasks/ProcessingTimeServiceFactory.java| 2 +- .../runtime/tasks/StreamOperatorWrapper.java | 2 +- .../flink/streaming/runtime/tasks/StreamTask.java | 5 +++- .../tasks/mailbox/MailboxExecutorFactory.java | 2 +- .../runtime/tasks/mailbox/MailboxExecutorImpl.java | 2 +- .../runtime/tasks/mailbox/MailboxProcessor.java| 2 +- .../runtime/tasks/mailbox/TaskMailbox.java | 2 +- .../api/graph/StreamingJobGraphGeneratorTest.java | 2 +- .../runtime/io/StreamTaskNetworkInputTest.java | 2 +- .../AlignedCheckpointsMassiveRandomTest.java | 2 +- .../io/checkpointing/AlignedCheckpointsTest.java | 2 +- .../checkpointing/AlternatingCheckpointsTest.java | 2 +- .../CheckpointBarrierTrackerTest.java | 2 +- .../io/checkpointing/InputProcessorUtilTest.java | 2 +- .../io/checkpointing/UnalignedCheckpointsTest.java | 2 +- .../runtime/operators/MailboxOperatorTest.java | 2 +- .../operators/StreamTaskOperatorTimerTest.java | 2 +- .../runtime/tasks/StreamOperatorWrapperTest.java | 2 +- .../tasks/StreamTaskMailboxTestHarness.java| 2 +- .../streaming/runtime/tasks/StreamTaskTest.java| 2 +- .../runtime/tasks/StreamTaskTestHarness.java | 2 +- .../tasks/mailbox/MailboxExecutorImplTest.java | 2 +- .../tasks/mailbox/TaskMailboxProcessorTest.java| 2 +- .../util/TestCheckpointedInputGateBuilder.java | 4 +-- 39 files changed, 138 insertions(+), 40 deletions(-) rename {flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators => flink-runtime/src/main/java/org/apache/flink/runtime/mailbox}/MailboxExecutor.java (97%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators => flink-runtime/src/test/java/org/apache/flink/runtime/mailbox}/SyncMailboxExecutor.java (97%)
[flink] branch master updated (a123091 -> a26887e)
This is an automated email from the ASF dual-hosted git repository. godfrey pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from a123091 [FLINK-23401][python] Separate data and timer processing into different channels add a26887e [FLINK-23434][table-planner] Fix the inconsistent type in IncrementalAggregateRule when the query has one distinct agg function and count star agg function No new revisions were added by this update. Summary of changes: .../stream/StreamExecGlobalGroupAggregate.java | 17 + .../StreamPhysicalGlobalGroupAggregate.scala | 19 +- .../physical/stream/IncrementalAggregateRule.scala | 21 +- .../table/planner/plan/utils/AggregateUtil.scala | 33 + .../stream/IncrementalAggregateJsonPlanTest.java | 21 + ...AggregateWithSumCountDistinctAndRetraction.out} | 662 - .../plan/stream/sql/agg/DistinctAggregateTest.xml | 179 +- .../stream/sql/agg/IncrementalAggregateTest.xml| 41 +- .../stream/sql/agg/DistinctAggregateTest.scala | 16 + .../runtime/stream/sql/SplitAggregateITCase.scala | 52 ++ 10 files changed, 744 insertions(+), 317 deletions(-) copy flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/{GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out => IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out} (56%)
[flink] 01/03: [FLINK-23401][python] Refactor the construction of transformation into getTransforms
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1a653c5acbe6ecd347ffbe3686eb324f88c0596a Author: Dian Fu AuthorDate: Fri Jul 2 15:12:09 2021 +0800 [FLINK-23401][python] Refactor the construction of transformation into getTransforms This closes #16541. --- .../java/org/apache/flink/python/Constants.java| 8 .../beam/BeamDataStreamPythonFunctionRunner.java | 35 -- .../python/beam/BeamPythonFunctionRunner.java | 56 +++--- .../python/beam/BeamTablePythonFunctionRunner.java | 31 ++-- 4 files changed, 85 insertions(+), 45 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/python/Constants.java b/flink-python/src/main/java/org/apache/flink/python/Constants.java index 54d1d06..555ccc2 100644 --- a/flink-python/src/main/java/org/apache/flink/python/Constants.java +++ b/flink-python/src/main/java/org/apache/flink/python/Constants.java @@ -27,4 +27,12 @@ public class Constants { // coder urns public static final String FLINK_CODER_URN = "flink:coder:v1"; + +// execution graph +public static final String TRANSFORM_ID = "transform"; +public static final String MAIN_INPUT_NAME = "input"; +public static final String MAIN_OUTPUT_NAME = "output"; + +public static final String INPUT_COLLECTION_ID = "input"; +public static final String OUTPUT_COLLECTION_ID = "output"; } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java index 5024ed0..a9f5b91 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java @@ -25,11 +25,21 @@ import org.apache.flink.python.env.PythonEnvironmentManager; import org.apache.flink.python.metric.FlinkMetricContainer; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.util.Preconditions; + +import org.apache.beam.model.pipeline.v1.RunnerApi; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Map; +import static org.apache.flink.python.Constants.INPUT_COLLECTION_ID; +import static org.apache.flink.python.Constants.MAIN_INPUT_NAME; +import static org.apache.flink.python.Constants.MAIN_OUTPUT_NAME; +import static org.apache.flink.python.Constants.OUTPUT_COLLECTION_ID; +import static org.apache.flink.python.Constants.TRANSFORM_ID; + /** * {@link BeamDataStreamPythonFunctionRunner} is responsible for starting a beam python harness to * execute user defined python function. @@ -37,6 +47,7 @@ import java.util.Map; @Internal public class BeamDataStreamPythonFunctionRunner extends BeamPythonFunctionRunner { +private final String functionUrn; private final FlinkFnApi.UserDefinedDataStreamFunction userDefinedDataStreamFunction; public BeamDataStreamPythonFunctionRunner( @@ -56,7 +67,6 @@ public class BeamDataStreamPythonFunctionRunner extends BeamPythonFunctionRunner super( taskName, environmentManager, -functionUrn, jobOptions, flinkMetricContainer, stateBackend, @@ -66,11 +76,28 @@ public class BeamDataStreamPythonFunctionRunner extends BeamPythonFunctionRunner managedMemoryFraction, inputCoderDescriptor, outputCoderDescriptor); -this.userDefinedDataStreamFunction = userDefinedDataStreamFunction; +this.functionUrn = Preconditions.checkNotNull(functionUrn); +this.userDefinedDataStreamFunction = +Preconditions.checkNotNull(userDefinedDataStreamFunction); } @Override -protected byte[] getUserDefinedFunctionsProtoBytes() { -return this.userDefinedDataStreamFunction.toByteArray(); +protected Map getTransforms() { +return Collections.singletonMap( +TRANSFORM_ID, +RunnerApi.PTransform.newBuilder() +.setUniqueName(TRANSFORM_ID) +.setSpec( +RunnerApi.FunctionSpec.newBuilder() +.setUrn(functionUrn) +.setPayload( + org.apache.beam.vendor.grpc.v1p26p0.com.google + .protobuf.ByteString.copyFrom( + userDefinedDataStreamFunction +
[flink] branch master updated (be82c54 -> a123091)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from be82c54 [hotfix][coordination] Provide richer exception message for suppressed exceptions durnig coordinator failover new 1a653c5 [FLINK-23401][python] Refactor the construction of transformation into getTransforms new 9df4b76 [FLINK-23401][python] Use ParDoPayload to represent the user-defined functions in Python DataStream API new a123091 [FLINK-23401][python] Separate data and timer processing into different channels The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../fn_execution/beam/beam_coder_impl_fast.pyx | 4 +- .../pyflink/fn_execution/beam/beam_operations.py | 36 ++--- .../fn_execution/beam/beam_operations_fast.pxd | 5 +- .../fn_execution/beam/beam_operations_fast.pyx | 31 +++-- .../fn_execution/beam/beam_operations_slow.py | 12 ++ .../pyflink/fn_execution/beam/beam_stream_fast.pxd | 2 +- .../pyflink/fn_execution/beam/beam_stream_fast.pyx | 22 ++- .../pyflink/fn_execution/coder_impl_fast.pyx | 3 +- .../pyflink/fn_execution/coder_impl_slow.py| 116 .../fn_execution/datastream/input_handler.py | 114 .../pyflink/fn_execution/datastream/operations.py | 107 +++ .../fn_execution/datastream/output_handler.py | 63 - .../fn_execution/datastream/timerservice_impl.py | 129 ++ .../datastream/window/window_operator.py | 22 +-- .../pyflink/fn_execution/table/operations.py | 2 +- .../fn_execution/table/window_aggregate_fast.pyx | 6 +- .../fn_execution/table/window_aggregate_slow.py| 6 +- .../pyflink/fn_execution/table/window_context.py | 7 +- .../java/org/apache/flink/python/Constants.java| 13 ++ .../apache/flink/python/PythonFunctionRunner.java | 3 + .../python/OneInputPythonFunctionOperator.java | 4 +- .../python/PythonKeyedCoProcessOperator.java | 147 ++--- .../python/PythonKeyedProcessOperator.java | 116 +--- .../python/TwoInputPythonFunctionOperator.java | 4 +- .../python/collector/RunnerOutputCollector.java| 60 + .../api/operators/python/timer/TimerHandler.java | 65 + .../python/timer/TimerRegistration.java} | 118 + .../api/operators/python/timer/TimerUtils.java | 45 +++ .../beam/BeamDataStreamPythonFunctionRunner.java | 98 +- .../python/beam/BeamPythonFunctionRunner.java | 128 -- .../flink/streaming/api/utils/PythonTypeUtils.java | 54 +--- .../utils/input/KeyedInputWithTimerRowFactory.java | 85 .../input/KeyedTwoInputWithTimerRowFactory.java| 87 .../flink/streaming/api/utils/input/TimerType.java | 31 - .../api/utils/output/TimerOperandType.java | 54 .../python/beam/BeamTablePythonFunctionRunner.java | 50 ++- .../flink/table/runtime/utils/PythonTestUtils.java | 19 ++- 37 files changed, 1052 insertions(+), 816 deletions(-) delete mode 100644 flink-python/pyflink/fn_execution/datastream/output_handler.py create mode 100644 flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/collector/RunnerOutputCollector.java create mode 100644 flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerHandler.java rename flink-python/src/main/java/org/apache/flink/streaming/api/{utils/output/OutputWithTimerRowHandler.java => operators/python/timer/TimerRegistration.java} (53%) create mode 100644 flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java delete mode 100644 flink-python/src/main/java/org/apache/flink/streaming/api/utils/input/KeyedInputWithTimerRowFactory.java delete mode 100644 flink-python/src/main/java/org/apache/flink/streaming/api/utils/input/KeyedTwoInputWithTimerRowFactory.java delete mode 100644 flink-python/src/main/java/org/apache/flink/streaming/api/utils/input/TimerType.java delete mode 100644 flink-python/src/main/java/org/apache/flink/streaming/api/utils/output/TimerOperandType.java
[flink] 02/03: [FLINK-23401][python] Use ParDoPayload to represent the user-defined functions in Python DataStream API
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 9df4b7608aea73d7b2ad3fc6aeee559a84e58c0b Author: Dian Fu AuthorDate: Fri Jul 2 16:03:23 2021 +0800 [FLINK-23401][python] Use ParDoPayload to represent the user-defined functions in Python DataStream API This closes #16541. --- .../pyflink/fn_execution/beam/beam_operations.py | 36 +- .../beam/BeamDataStreamPythonFunctionRunner.java | 24 --- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations.py b/flink-python/pyflink/fn_execution/beam/beam_operations.py index 6961d7c..04fc5d4 100644 --- a/flink-python/pyflink/fn_execution/beam/beam_operations.py +++ b/flink-python/pyflink/fn_execution/beam/beam_operations.py @@ -15,8 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. - +from apache_beam.portability import common_urns +from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners.worker import bundle_processor, operation_specs +from apache_beam.utils import proto_utils from pyflink.fn_execution import flink_fn_execution_pb2 from pyflink.fn_execution.coders import from_proto, from_type_info_proto, TimeWindowCoder, \ @@ -113,25 +115,29 @@ def create_pandas_over_window_aggregate_function( table_operations.PandasBatchOverWindowAggregateFunctionOperation) -@bundle_processor.BeamTransformFactory.register_urn( -datastream_operations.DATA_STREAM_STATELESS_FUNCTION_URN, -flink_fn_execution_pb2.UserDefinedDataStreamFunction) -def create_data_stream_function(factory, transform_id, transform_proto, parameter, consumers): -return _create_user_defined_function_operation( -factory, transform_proto, consumers, parameter, -beam_operations.StatelessFunctionOperation, -datastream_operations.StatelessOperation) +# - DataStream @bundle_processor.BeamTransformFactory.register_urn( -datastream_operations.DATA_STREAM_STATEFUL_FUNCTION_URN, -flink_fn_execution_pb2.UserDefinedDataStreamFunction) +common_urns.primitives.PAR_DO.urn, beam_runner_api_pb2.ParDoPayload) def create_data_stream_keyed_process_function(factory, transform_id, transform_proto, parameter, consumers): -return _create_user_defined_function_operation( -factory, transform_proto, consumers, parameter, -beam_operations.StatefulFunctionOperation, -datastream_operations.StatefulOperation) +urn = parameter.do_fn.urn +payload = proto_utils.parse_Bytes( +parameter.do_fn.payload, flink_fn_execution_pb2.UserDefinedDataStreamFunction) +if urn == datastream_operations.DATA_STREAM_STATELESS_FUNCTION_URN: +return _create_user_defined_function_operation( +factory, transform_proto, consumers, payload, +beam_operations.StatelessFunctionOperation, +datastream_operations.StatelessOperation) +else: +return _create_user_defined_function_operation( +factory, transform_proto, consumers, payload, +beam_operations.StatefulFunctionOperation, +datastream_operations.StatefulOperation) + + +# - Utilities def _create_user_defined_function_operation(factory, transform_proto, consumers, udfs_proto, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java index a9f5b91..defb696 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.util.Preconditions; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.BeamUrns; import javax.annotation.Nullable; @@ -83,11 +84,10 @@ public class BeamDataStreamPythonFunctionRunner extends BeamPythonFunctionRunner @Override protected Map getTransforms() { -return Collections.singletonMap( -TRANSFORM_ID, -RunnerApi.PTransform.newBuilder() -.setUniqueName(TRANSFORM_ID) -.setSpec( +// Use ParDoPayload as a wrapper of the actual payload as timer is only supported in ParDo +RunnerApi.ParDoPayload payload = +
[flink] branch master updated (b34ef1e -> be82c54)
This is an automated email from the ASF dual-hosted git repository. sewen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from b34ef1e [FLINK-18783] Load Akka with separate classloader add cc5c9c0 [hotfix][tests] Fix raw generics in OperatorEventSendingCheckpointITCase add 798003c [hotfix][coordination] Reduce log level for suppressed failures from WARN to DEBUG. add be82c54 [hotfix][coordination] Provide richer exception message for suppressed exceptions durnig coordinator failover No new revisions were added by this update. Summary of changes: .../coordination/OperatorCoordinatorHolder.java | 17 + .../OperatorEventSendingCheckpointITCase.java | 9 ++--- 2 files changed, 15 insertions(+), 11 deletions(-)
[flink] branch benchmark-request updated (cffa1ae -> 36e0224)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch benchmark-request in repository https://gitbox.apache.org/repos/asf/flink.git. discard cffa1ae Bump lz4-java to 1.8.0 add ed39fb2 [FLINK-23395] Bump Okhttp to 3.14.9 new a2ad139 [FLINK-23329][build] Bump flink-shaded to 14.0 new 36e0224 Adjust guava package This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (cffa1ae) \ N -- N -- N refs/heads/benchmark-request (36e0224) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../application/JarManifestParserTest.java | 4 +-- .../cassandra/CassandraInputFormatBase.java| 2 +- .../source/HBaseRowDataAsyncLookupFunction.java| 4 +-- .../connector/hbase2/HBaseConnectorITCase.java | 2 +- .../HBaseRowDataAsyncLookupFunctionTest.java | 2 +- .../hbase/source/HBaseRowDataLookupFunction.java | 4 +-- .../connectors/hive/FlinkEmbeddedHiveRunner.java | 2 +- .../connectors/hive/FlinkStandaloneHiveRunner.java | 4 +-- .../connector/jdbc/table/JdbcLookupFunction.java | 4 +-- .../jdbc/table/JdbcRowDataLookupFunction.java | 4 +-- .../connectors/kafka/FlinkKafkaProducer.java | 2 +- .../internals/FlinkKafkaInternalProducer.java | 2 +- .../kafka/FlinkKafkaInternalProducerITCase.java| 2 +- .../connectors/kafka/KafkaConsumerTestBase.java| 2 +- .../kafka/shuffle/KafkaShuffleITCase.java | 4 +-- .../kinesis/testutils/KinesisPubsubClient.java | 2 +- ...inesisDataFetcherForShardConsumerException.java | 2 +- .../GuavaFlinkConnectorRateLimiter.java| 2 +- .../apache/flink/configuration/CoreOptions.java| 4 +-- .../core/classloading/ComponentClassLoader.java| 2 +- .../java/org/apache/flink/core/fs/FileSystem.java | 8 +++--- .../flink/core/plugin/DefaultPluginManager.java| 2 +- .../main/java/org/apache/flink/util/FileUtils.java | 4 +-- .../flink/configuration/ConfigOptionTest.java | 2 +- .../apache/flink/core/plugin/PluginConfigTest.java | 2 +- flink-dist/pom.xml | 2 +- flink-dist/src/main/resources/META-INF/NOTICE | 2 +- .../org/apache/flink/dist/BashJavaUtilsITCase.java | 2 +- .../flink-end-to-end-tests-common/pom.xml | 1 - .../flink-glue-schema-registry-test/pom.xml| 2 ++ .../flink-metrics-reporter-prometheus-test/pom.xml | 1 - .../tests/queryablestate/QsStateProducer.java | 2 +- .../kinesis/test/KinesisTableApiITCase.java| 2 +- .../flink-avro-glue-schema-registry/pom.xml| 2 ++ flink-kubernetes/pom.xml | 18 +--- .../decorators/FlinkConfMountDecorator.java| 2 +- .../decorators/KerberosMountDecorator.java | 2 +- .../decorators/PodTemplateMountDecorator.java | 2 +- .../src/main/resources/META-INF/NOTICE | 6 ++-- .../flink/kubernetes/KubernetesTestUtils.java | 2 +- .../flink/cep/nfa/sharedbuffer/SharedBuffer.java | 2 +- .../apache/flink/cep/nfa/AfterMatchSkipITCase.java | 2 +- .../org/apache/flink/cep/nfa/GreedyITCase.java | 2 +- .../java/org/apache/flink/cep/nfa/GroupITCase.java | 2 +- .../flink/cep/nfa/IterativeConditionsITCase.java | 2 +- .../java/org/apache/flink/cep/nfa/NFAITCase.java | 2 +- .../org/apache/flink/cep/nfa/NotPatternITCase.java | 2 +- .../apache/flink/cep/nfa/SameElementITCase.java| 4 +-- .../apache/flink/cep/nfa/TimesOrMoreITCase.java| 2 +- .../org/apache/flink/cep/nfa/TimesRangeITCase.java | 2 +- .../apache/flink/cep/nfa/UntilConditionITCase.java | 2 +- .../flink/cep/nfa/compiler/NFACompilerTest.java| 2 +- .../apache/flink/cep/operator/CEPOperatorTest.java | 2 +- .../apache/flink/cep/utils/TestSharedBuffer.java | 4 +-- .../state/api/runtime/OperatorIDGenerator.java | 2 +- flink-metrics/flink-metrics-datadog/pom.xml| 10 ++- .../src/main/resources/META-INF/NOTICE | 4 +-- .../src/main/resources/META-INF/NOTICE | 4 +-- .../flink/optimizer/dag/SingleInputNode.java | 2 +- .../apache/flink/optimizer/dag/TwoInputNode.java | 2 +-
[flink] 01/02: [FLINK-23329][build] Bump flink-shaded to 14.0
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch benchmark-request in repository https://gitbox.apache.org/repos/asf/flink.git commit a2ad139d0bf0dd62a57b8563888e73f794e0b1f2 Author: Chesnay Schepler AuthorDate: Wed Jul 7 12:05:53 2021 +0200 [FLINK-23329][build] Bump flink-shaded to 14.0 --- flink-dist/pom.xml | 2 +- .../java/org/apache/flink/cep/utils/TestSharedBuffer.java | 4 +--- .../flink/yarn/YARNSessionCapacitySchedulerITCase.java | 2 +- pom.xml| 14 +++--- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 0355f27..b88b92b 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -34,7 +34,7 @@ under the License. jar - 3.5.6 + 3.5.9 diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java index e133c44..8a5df5b 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java @@ -32,8 +32,6 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer; -import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; - import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -207,7 +205,7 @@ public class TestSharedBuffer extends SharedBuffer { @Override public Iterator> iterator() throws Exception { if (values == null) { -return Iterators.emptyIterator(); +return Collections.emptyIterator(); } return new CountingIterator<>(values.entrySet().iterator()); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index e5ae45d..2f3e5dd 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -322,7 +322,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { try { final String logs = outContent.toString(); final HostAndPort hostAndPort = parseJobManagerHostname(logs); -final String host = hostAndPort.getHostText(); +final String host = hostAndPort.getHost(); final int port = hostAndPort.getPort(); LOG.info("Extracted hostname:port: {}:{}", host, port); diff --git a/pom.xml b/pom.xml index c2dbb32..0068d38 100644 --- a/pom.xml +++ b/pom.xml @@ -100,7 +100,8 @@ under the License. to avoid process kills due to container limits on TravisCI --> ${flink.forkCount} true - 13.0 + 14.0 + 2.12.4 18.0 2.5.21 1.8 @@ -120,7 +121,6 @@ under the License. 3.4.14 2.12.0 - 2.12.1 0.8.1 1.10.0 1.2.0 @@ -281,31 +281,31 @@ under the License. org.apache.flink flink-shaded-guava - 18.0-${flink.shaded.version} + 30.1.1-jre-${flink.shaded.version} org.apache.flink flink-shaded-jackson - ${jackson.version}-${flink.shaded.version} + ${flink.shaded.jackson.version}-${flink.shaded.version} org.apache.flink flink-shaded-jackson-module-jsonSchema - ${jackson.version}-${flink.shaded.version} + ${flink.shaded.jackson.version}-${flink.shaded.version} org.apache.flink flink-shaded-netty - 4.1.49.Final-${flink.shaded.version} +
[flink] branch master updated (f9286a5 -> b34ef1e)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f9286a5 [FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to preserve FileSystem info add b34ef1e [FLINK-18783] Load Akka with separate classloader No new revisions were added by this update. Summary of changes: .../resource-providers/standalone/kubernetes.md| 4 +- .../resource-providers/standalone/kubernetes.md| 4 +- flink-clients/pom.xml | 6 +- flink-connectors/flink-connector-base/pom.xml | 2 +- flink-connectors/flink-connector-cassandra/pom.xml | 2 +- .../flink-connector-elasticsearch-base/pom.xml | 2 +- .../flink-connector-elasticsearch5/pom.xml | 2 +- .../flink-connector-elasticsearch6/pom.xml | 2 +- .../flink-connector-elasticsearch7/pom.xml | 2 +- flink-connectors/flink-connector-files/pom.xml | 2 +- .../flink-connector-gcp-pubsub/pom.xml | 2 +- flink-connectors/flink-connector-hbase-1.4/pom.xml | 2 +- flink-connectors/flink-connector-hive/pom.xml | 2 +- flink-connectors/flink-connector-jdbc/pom.xml | 2 +- flink-connectors/flink-connector-kafka/pom.xml | 4 +- flink-connectors/flink-connector-kinesis/pom.xml | 4 +- flink-connectors/flink-connector-rabbitmq/pom.xml | 4 +- flink-container/pom.xml| 2 +- .../apache/flink/configuration/CoreOptions.java| 3 +- .../core/classloading/SubmoduleClassLoader.java| 45 +++ .../apache/flink/util/concurrent/FutureUtils.java | 26 +- flink-dist/pom.xml | 11 +- flink-dist/src/main/assemblies/opt.xml | 4 +- .../src/main/flink-bin/conf/log4j-cli.properties | 2 +- .../main/flink-bin/conf/log4j-console.properties | 2 +- .../main/flink-bin/conf/log4j-session.properties | 2 +- .../src/main/flink-bin/conf/log4j.properties | 2 +- .../src/main/flink-bin/conf/logback-console.xml| 2 +- flink-dist/src/main/flink-bin/conf/logback.xml | 2 +- flink-dist/src/main/resources/META-INF/NOTICE | 12 - flink-docs/pom.xml | 4 +- .../pom.xml| 2 +- .../flink-end-to-end-tests-common/pom.xml | 2 +- .../flink-metrics-availability-test/pom.xml| 2 +- flink-end-to-end-tests/test-scripts/common.sh | 4 +- flink-examples/flink-examples-streaming/pom.xml| 2 +- flink-examples/flink-examples-table/pom.xml| 2 +- flink-formats/flink-avro/pom.xml | 2 +- flink-formats/flink-compress/pom.xml | 2 +- flink-formats/flink-csv/pom.xml| 2 +- flink-formats/flink-hadoop-bulk/pom.xml| 2 +- flink-formats/flink-json/pom.xml | 2 +- flink-formats/flink-orc/pom.xml| 2 +- flink-formats/flink-parquet/pom.xml| 2 +- flink-formats/flink-sequence-file/pom.xml | 2 +- flink-fs-tests/pom.xml | 4 +- flink-kubernetes/pom.xml | 6 +- flink-libraries/flink-cep/pom.xml | 4 +- flink-libraries/flink-gelly/pom.xml| 2 +- flink-libraries/flink-state-processing-api/pom.xml | 4 +- flink-metrics/flink-metrics-influxdb/pom.xml | 2 +- flink-metrics/flink-metrics-jmx/pom.xml| 2 +- flink-metrics/flink-metrics-prometheus/pom.xml | 4 +- flink-optimizer/pom.xml| 6 +- flink-python/pom.xml | 2 +- .../flink-queryable-state-runtime/pom.xml | 8 +- flink-rpc/flink-rpc-akka-loader/pom.xml| 127 +++ .../runtime/rpc/akka/AkkaRpcSystemLoader.java | 78 + .../runtime/rpc/akka/CleanupOnCloseRpcSystem.java} | 61 +++- .../org.apache.flink.runtime.rpc.RpcSystemLoader | 2 +- flink-rpc/flink-rpc-akka/pom.xml | 176 ++ .../akka/ActorSystemScheduledExecutorAdapter.java | 22 +- .../runtime/concurrent/akka/ClassLoadingUtils.java | 119 +++ .../runtime/rpc/akka/AkkaInvocationHandler.java| 12 +- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 51 ++- .../flink/runtime/rpc/akka/AkkaRpcService.java | 100 -- .../runtime/rpc/akka/AkkaRpcServiceUtils.java | 64 ++-- .../rpc/akka/FencedAkkaInvocationHandler.java | 6 +- .../flink/runtime/rpc/akka/FencedAkkaRpcActor.java | 5 +- .../src/main/resources/META-INF/NOTICE | 31 +- .../apache/flink/runtime/rpc/akka/AkkaUtils.scala | 16 +- .../runtime/rpc/akka/CustomSSLEngineProvider.scala | 3 +- .../concurrent/akka/ClassLoadingUtilsTest.java | 105 ++ .../rpc/akka/ContextClassLoadingSettingTest.java | 381 +
[flink] branch release-1.12 updated (600ce81 -> 909a6a1)
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a change to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git. from 600ce81 [FLINK-23419][streaming] Fixed the condition in testNextFirstCheckpointBarrierOvertakesCancellationBarrier in order to make the test more stable (#16534) add 909a6a1 [FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to preserve FileSystem info No new revisions were added by this update. Summary of changes: .../org/apache/flink/state/api/output/FileCopyFunction.java | 5 ++--- .../apache/flink/state/api/output/StatePathExtractor.java| 12 ++-- 2 files changed, 8 insertions(+), 9 deletions(-)
[flink-benchmarks] branch master updated (12b87f8 -> e58adbf)
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git. from 12b87f8 [FLINK-23208] Add a benchmark for processing timers add e58adbf [hotfix] Improve README No new revisions were added by this update. Summary of changes: README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
[flink] branch release-1.13 updated: [FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to preserve FileSystem info
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.13 by this push: new dd2945d [FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to preserve FileSystem info dd2945d is described below commit dd2945d73849a3bedf4f390a5ea8541f9815ce6e Author: Jun Qin <11677043+qinjunje...@users.noreply.github.com> AuthorDate: Wed Jul 21 08:54:52 2021 +0200 [FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to preserve FileSystem info This closes #16550 --- .../org/apache/flink/state/api/output/FileCopyFunction.java | 5 ++--- .../apache/flink/state/api/output/StatePathExtractor.java| 12 ++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java index 9b15222..f761061 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java @@ -32,7 +32,7 @@ import java.io.IOException; /** This output format copies files from an existing savepoint into a new directory. */ @Internal -public final class FileCopyFunction implements OutputFormat { +public final class FileCopyFunction implements OutputFormat { private static final long serialVersionUID = 1L; @@ -52,8 +52,7 @@ public final class FileCopyFunction implements OutputFormat { } @Override -public void writeRecord(String record) throws IOException { -Path sourcePath = new Path(record); +public void writeRecord(Path sourcePath) throws IOException { Path destPath = new Path(path, sourcePath.getName()); try (FSDataOutputStream os = destPath.getFileSystem() diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java index b7c260b..b9107a5 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java @@ -34,18 +34,18 @@ import javax.annotation.Nullable; /** Extracts all file paths that are part of the provided {@link OperatorState}. */ @Internal -public class StatePathExtractor implements FlatMapFunction { +public class StatePathExtractor implements FlatMapFunction { private static final long serialVersionUID = 1L; @Override -public void flatMap(OperatorState operatorState, Collector out) throws Exception { +public void flatMap(OperatorState operatorState, Collector out) throws Exception { for (OperatorSubtaskState subTaskState : operatorState.getSubtaskStates().values()) { // managed operator state for (OperatorStateHandle operatorStateHandle : subTaskState.getManagedOperatorState()) { Path path = getStateFilePathFromStreamStateHandle(operatorStateHandle); if (path != null) { -out.collect(path.getPath()); +out.collect(path); } } // managed keyed state @@ -55,7 +55,7 @@ public class StatePathExtractor implements FlatMapFunction
[flink] branch master updated: [FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to preserve FileSystem info
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f9286a5 [FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to preserve FileSystem info f9286a5 is described below commit f9286a54ded7c34d7bf49146a93ee86cc701ac9b Author: Jun Qin <11677043+qinjunje...@users.noreply.github.com> AuthorDate: Tue Jul 20 10:42:51 2021 +0200 [FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to preserve FileSystem info This closes #16542 --- .../org/apache/flink/state/api/output/FileCopyFunction.java | 5 ++--- .../apache/flink/state/api/output/StatePathExtractor.java| 12 ++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java index 9b15222..f761061 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java @@ -32,7 +32,7 @@ import java.io.IOException; /** This output format copies files from an existing savepoint into a new directory. */ @Internal -public final class FileCopyFunction implements OutputFormat { +public final class FileCopyFunction implements OutputFormat { private static final long serialVersionUID = 1L; @@ -52,8 +52,7 @@ public final class FileCopyFunction implements OutputFormat { } @Override -public void writeRecord(String record) throws IOException { -Path sourcePath = new Path(record); +public void writeRecord(Path sourcePath) throws IOException { Path destPath = new Path(path, sourcePath.getName()); try (FSDataOutputStream os = destPath.getFileSystem() diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java index b7c260b..b9107a5 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java @@ -34,18 +34,18 @@ import javax.annotation.Nullable; /** Extracts all file paths that are part of the provided {@link OperatorState}. */ @Internal -public class StatePathExtractor implements FlatMapFunction { +public class StatePathExtractor implements FlatMapFunction { private static final long serialVersionUID = 1L; @Override -public void flatMap(OperatorState operatorState, Collector out) throws Exception { +public void flatMap(OperatorState operatorState, Collector out) throws Exception { for (OperatorSubtaskState subTaskState : operatorState.getSubtaskStates().values()) { // managed operator state for (OperatorStateHandle operatorStateHandle : subTaskState.getManagedOperatorState()) { Path path = getStateFilePathFromStreamStateHandle(operatorStateHandle); if (path != null) { -out.collect(path.getPath()); +out.collect(path); } } // managed keyed state @@ -55,7 +55,7 @@ public class StatePathExtractor implements FlatMapFunction
[flink] branch master updated (3137184 -> 7083bb7)
This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 3137184 [FLINK-23402][streaming-java] Mark GlobalStreamExchangeMode as @Internal add 7083bb7 [hotfix][docs] fix image rendering on local installation No new revisions were added by this update. Summary of changes: docs/content/docs/try-flink/local_installation.md | 8 1 file changed, 4 insertions(+), 4 deletions(-)
[flink] 02/04: [FLINK-23402][streaming-java] Fix minor code issues around 'shuffle mode'
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 86f54c89c7866647e50d3957026bd0d28869ea8d Author: Timo Walther AuthorDate: Mon Jul 19 15:02:38 2021 +0200 [FLINK-23402][streaming-java] Fix minor code issues around 'shuffle mode' --- .../flink/streaming/api/graph/StreamEdge.java | 2 ++ .../transformations/PartitionTransformation.java | 20 ++-- .../api/transformations/StreamExchangeMode.java| 9 +++--- .../api/graph/StreamingJobGraphGeneratorTest.java | 36 +++--- ...aphGeneratorWithGlobalDataExchangeModeTest.java | 2 +- .../plan/nodes/exec/batch/BatchExecExchange.java | 2 +- .../utils/InputPriorityConflictResolver.java | 2 +- .../planner/utils/StreamExchangeModeUtils.java | 2 +- 8 files changed, 39 insertions(+), 36 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java index 5ca627c..84c4110 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java @@ -194,6 +194,8 @@ public class StreamEdge implements Serializable { + typeNumber + ", outputPartitioner=" + outputPartitioner ++ ", exchangeMode=" ++ exchangeMode + ", bufferTimeout=" + bufferTimeout + ", outputTag=" diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java index 2521539..bc334d1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java @@ -35,7 +35,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * This does not create a physical operation, it only affects how upstream operations are * connected to downstream operations. * - * @param The type of the elements that result from this {@code PartitionTransformation} + * @param The type of the elements that result from this {@link PartitionTransformation} */ @Internal public class PartitionTransformation extends Transformation { @@ -47,23 +47,23 @@ public class PartitionTransformation extends Transformation { private final StreamExchangeMode exchangeMode; /** - * Creates a new {@code PartitionTransformation} from the given input and {@link + * Creates a new {@link PartitionTransformation} from the given input and {@link * StreamPartitioner}. * - * @param input The input {@code Transformation} - * @param partitioner The {@code StreamPartitioner} + * @param input The input {@link Transformation} + * @param partitioner The {@link StreamPartitioner} */ public PartitionTransformation(Transformation input, StreamPartitioner partitioner) { this(input, partitioner, StreamExchangeMode.UNDEFINED); } /** - * Creates a new {@code PartitionTransformation} from the given input and {@link + * Creates a new {@link PartitionTransformation} from the given input and {@link * StreamPartitioner}. * - * @param input The input {@code Transformation} - * @param partitioner The {@code StreamPartitioner} - * @param exchangeMode The {@code ShuffleMode} + * @param input The input {@link Transformation} + * @param partitioner The {@link StreamPartitioner} + * @param exchangeMode The {@link StreamExchangeMode} */ public PartitionTransformation( Transformation input, @@ -76,8 +76,8 @@ public class PartitionTransformation extends Transformation { } /** - * Returns the {@code StreamPartitioner} that must be used for partitioning the elements of the - * input {@code Transformation}. + * Returns the {@link StreamPartitioner} that must be used for partitioning the elements of the + * input {@link Transformation}. */ public StreamPartitioner getPartitioner() { return partitioner; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java index 4637c07..5752add 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java +++
[flink] branch master updated (9e0e092 -> 3137184)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 9e0e092 [FLINK-23407][docs] Use DescribedEnum for existing config option enums new 4e65322 [FLINK-23402][streaming-java] Refactor ShuffleMode to StreamExchangeMode new 86f54c8 [FLINK-23402][streaming-java] Fix minor code issues around 'shuffle mode' new 156f517 [FLINK-23402][streaming-java] Refactor GlobalDataExchangeMode to GlobalStreamExchangeMode new 3137184 [FLINK-23402][streaming-java] Mark GlobalStreamExchangeMode as @Internal The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/flink/table/tpcds/TpcdsTestProgram.java | 4 +- .../environment/StreamExecutionEnvironment.java| 5 +- ...angeMode.java => GlobalStreamExchangeMode.java} | 8 ++-- .../flink/streaming/api/graph/StreamEdge.java | 20 .../flink/streaming/api/graph/StreamGraph.java | 33 ++--- .../streaming/api/graph/StreamGraphGenerator.java | 8 ++-- .../api/graph/StreamingJobGraphGenerator.java | 12 ++--- .../transformations/PartitionTransformation.java | 36 +++--- .../{ShuffleMode.java => StreamExchangeMode.java} | 15 +++--- .../PartitionTransformationTranslator.java | 2 +- ...amGraphGeneratorExecutionModeDetectionTest.java | 25 +- .../api/graph/StreamingJobGraphGeneratorTest.java | 54 ++--- ...GeneratorWithGlobalStreamExchangeModeTest.java} | 30 ++-- .../table/planner/plan/nodes/exec/ExecEdge.java| 31 ++-- .../plan/nodes/exec/batch/BatchExecExchange.java | 38 +++ .../exec/processor/DeadlockBreakupProcessor.java | 4 +- .../MultipleInputNodeCreationProcessor.java| 4 +- .../utils/InputPriorityConflictResolver.java | 21 .../exec/serde/ExecNodeGraphJsonPlanGenerator.java | 18 +++ .../flink/table/planner/utils/ExecutorUtils.java | 8 ++-- ...ModeUtils.java => StreamExchangeModeUtils.java} | 14 +++--- .../physical/batch/BatchPhysicalExchange.scala | 16 +++ .../utils/InputPriorityConflictResolverTest.java | 19 ...sTest.java => StreamExchangeModeUtilsTest.java} | 56 +++--- .../planner/runtime/utils/BatchTestBase.scala | 4 +- .../flink/table/planner/utils/TableTestBase.scala | 4 +- .../flink/test/runtime/BlockingShuffleITCase.java | 4 +- 27 files changed, 255 insertions(+), 238 deletions(-) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/{GlobalDataExchangeMode.java => GlobalStreamExchangeMode.java} (89%) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/{ShuffleMode.java => StreamExchangeMode.java} (72%) rename flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/{StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java => StreamingJobGraphGeneratorWithGlobalStreamExchangeModeTest.java} (87%) rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/{ShuffleModeUtils.java => StreamExchangeModeUtils.java} (79%) rename flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/{ShuffleModeUtilsTest.java => StreamExchangeModeUtilsTest.java} (57%)
[flink] 03/04: [FLINK-23402][streaming-java] Refactor GlobalDataExchangeMode to GlobalStreamExchangeMode
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 156f517d387202ac292bde5bfac423a23908b7a2 Author: Timo Walther AuthorDate: Mon Jul 19 16:14:51 2021 +0200 [FLINK-23402][streaming-java] Refactor GlobalDataExchangeMode to GlobalStreamExchangeMode --- .../apache/flink/table/tpcds/TpcdsTestProgram.java | 4 +- .../environment/StreamExecutionEnvironment.java| 5 ++- ...angeMode.java => GlobalStreamExchangeMode.java} | 2 +- .../flink/streaming/api/graph/StreamGraph.java | 10 ++--- .../streaming/api/graph/StreamGraphGenerator.java | 8 ++-- .../api/graph/StreamingJobGraphGenerator.java | 4 +- ...amGraphGeneratorExecutionModeDetectionTest.java | 25 ++- ...GeneratorWithGlobalStreamExchangeModeTest.java} | 22 +- .../plan/nodes/exec/batch/BatchExecExchange.java | 4 +- .../flink/table/planner/utils/ExecutorUtils.java | 8 ++-- .../planner/utils/StreamExchangeModeUtils.java | 10 ++--- .../physical/batch/BatchPhysicalExchange.scala | 4 +- .../planner/utils/StreamExchangeModeUtilsTest.java | 48 +++--- .../planner/runtime/utils/BatchTestBase.scala | 4 +- .../flink/table/planner/utils/TableTestBase.scala | 4 +- .../flink/test/runtime/BlockingShuffleITCase.java | 4 +- 16 files changed, 85 insertions(+), 81 deletions(-) diff --git a/flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java b/flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java index 1ade3bb..3f46dc2 100644 --- a/flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java +++ b/flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java @@ -20,7 +20,7 @@ package org.apache.flink.table.tpcds; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode; +import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; @@ -145,7 +145,7 @@ public class TpcdsTestProgram { .getConfiguration() .setString( ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, - GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED.toString()); + GlobalStreamExchangeMode.POINTWISE_EDGES_PIPELINED.toString()); tEnv.getConfig() .getConfiguration() .setLong( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 9670b39..8b0dd6a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -93,7 +93,7 @@ import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; -import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode; +import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; @@ -2110,7 +2110,8 @@ public class StreamExecutionEnvironment { if (configuration.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH && streamGraph.hasFineGrainedResource()) { if (configuration.get(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING)) { - streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING); +streamGraph.setGlobalStreamExchangeMode( +GlobalStreamExchangeMode.ALL_EDGES_BLOCKING); } else { throw new IllegalConfigurationException( "At the moment, fine-grained resource management requires batch workloads to " diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode.java similarity index 98% rename from
[flink] 04/04: [FLINK-23402][streaming-java] Mark GlobalStreamExchangeMode as @Internal
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 313718466d15b473bd5bf1dcf0d9d988e0fd5979 Author: Timo Walther AuthorDate: Mon Jul 19 16:20:03 2021 +0200 [FLINK-23402][streaming-java] Mark GlobalStreamExchangeMode as @Internal This closes #16536. --- .../org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode.java index 2befa4b..ee783e3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalStreamExchangeMode.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.graph; +import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; @@ -27,6 +28,7 @@ import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; * This mode decides the default {@link ResultPartitionType} of job edges. Note that this only * affects job edges which are {@link StreamExchangeMode#UNDEFINED}. */ +@Internal public enum GlobalStreamExchangeMode { /** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */ ALL_EDGES_BLOCKING,
[flink] 01/04: [FLINK-23402][streaming-java] Refactor ShuffleMode to StreamExchangeMode
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 4e65322dc1b5f80a7f3a42f0f205f978357daa40 Author: Timo Walther AuthorDate: Mon Jul 19 14:22:18 2021 +0200 [FLINK-23402][streaming-java] Refactor ShuffleMode to StreamExchangeMode --- .../api/graph/GlobalDataExchangeMode.java | 4 +-- .../flink/streaming/api/graph/StreamEdge.java | 18 ++-- .../flink/streaming/api/graph/StreamGraph.java | 23 .../api/graph/StreamingJobGraphGenerator.java | 8 +++--- .../transformations/PartitionTransformation.java | 18 ++-- .../{ShuffleMode.java => StreamExchangeMode.java} | 6 ++-- .../PartitionTransformationTranslator.java | 2 +- .../api/graph/StreamingJobGraphGeneratorTest.java | 30 ++-- ...aphGeneratorWithGlobalDataExchangeModeTest.java | 8 +++--- .../table/planner/plan/nodes/exec/ExecEdge.java| 31 +++-- .../plan/nodes/exec/batch/BatchExecExchange.java | 32 +++--- .../exec/processor/DeadlockBreakupProcessor.java | 4 +-- .../MultipleInputNodeCreationProcessor.java| 4 +-- .../utils/InputPriorityConflictResolver.java | 21 +++--- .../exec/serde/ExecNodeGraphJsonPlanGenerator.java | 18 ++-- .../flink/table/planner/utils/ExecutorUtils.java | 2 +- ...ModeUtils.java => StreamExchangeModeUtils.java} | 2 +- .../physical/batch/BatchPhysicalExchange.scala | 12 .../utils/InputPriorityConflictResolverTest.java | 19 +++-- ...sTest.java => StreamExchangeModeUtilsTest.java} | 28 +-- 20 files changed, 149 insertions(+), 141 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java index b2b5f21..3118f7f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java @@ -19,13 +19,13 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; /** * This mode decides the default {@link ResultPartitionType} of job edges. Note that this only - * affects job edges which are {@link ShuffleMode#UNDEFINED}. + * affects job edges which are {@link StreamExchangeMode#UNDEFINED}. */ public enum GlobalDataExchangeMode { /** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java index f76edd5..5ca627c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.util.OutputTag; @@ -58,7 +58,7 @@ public class StreamEdge implements Serializable { /** The name of the operator in the target vertex. */ private final String targetOperatorName; -private final ShuffleMode shuffleMode; +private final StreamExchangeMode exchangeMode; private long bufferTimeout; @@ -78,7 +78,7 @@ public class StreamEdge implements Serializable { ALWAYS_FLUSH_BUFFER_TIMEOUT, outputPartitioner, outputTag, -ShuffleMode.UNDEFINED); +StreamExchangeMode.UNDEFINED); } public StreamEdge( @@ -87,7 +87,7 @@ public class StreamEdge implements Serializable { int typeNumber, StreamPartitioner outputPartitioner, OutputTag outputTag, -ShuffleMode shuffleMode) { +StreamExchangeMode exchangeMode) { this( sourceVertex, @@ -96,7 +96,7 @@ public class StreamEdge implements Serializable { sourceVertex.getBufferTimeout(), outputPartitioner, outputTag, -shuffleMode); +exchangeMode); }
[flink] branch master updated (3d1c11e -> 9e0e092)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 3d1c11e [FLINK-20562][sql-client][table] Support explain details for EXPLAIN statement add cdf6174 [FLINK-23407][docs][core] Documented enums for config options add 9e0e092 [FLINK-23407][docs] Use DescribedEnum for existing config option enums No new revisions were added by this update. Summary of changes: .../generated/cluster_configuration.html | 8 +-- .../execution_checkpointing_configuration.html | 8 +-- .../generated/execution_config_configuration.html | 8 +-- .../generated/execution_configuration.html | 4 +- .../generated/expert_cluster_section.html | 8 +-- .../generated/expert_scheduling_section.html | 4 +- .../generated/influxdb_reporter_configuration.html | 8 +-- .../generated/job_manager_configuration.html | 4 +- .../generated/kubernetes_config_configuration.html | 8 +-- .../generated/pipeline_configuration.html | 4 +- .../rocksdb_configurable_configuration.html| 8 +-- .../generated/rocksdb_configuration.html | 4 +- .../generated/sql_client_configuration.html| 4 +- .../generated/state_backend_rocksdb_section.html | 4 +- .../generated/stream_pipeline_configuration.html | 4 +- .../generated/yarn_config_configuration.html | 4 +- .../apache/flink/api/common/ExecutionConfig.java | 25 +--- .../apache/flink/configuration/ClusterOptions.java | 50 ++- .../{WritableConfig.java => DescribedEnum.java}| 26 +--- .../flink/configuration/PipelineOptions.java | 16 + .../configuration/description/TextElement.java | 7 +++ .../configuration/ConfigOptionsDocGenerator.java | 72 +++--- .../ConfigOptionsDocGeneratorTest.java | 39 ++-- .../ConfigOptionsDocsCompletenessITCase.java | 9 +-- .../configuration/KubernetesConfigOptions.java | 4 +- .../state/EmbeddedRocksDBStateBackend.java | 37 --- .../contrib/streaming/state/RocksDBOptions.java| 18 +++--- .../flink/table/client/config/ResultMode.java | 28 ++--- .../table/client/config/SqlClientOptions.java | 6 +- .../test/checkpointing/TimersSavepointITCase.java | 5 +- .../flink/test/state/BackendSwitchSpecs.java | 7 ++- .../yarn/configuration/YarnConfigOptions.java | 34 +- 32 files changed, 295 insertions(+), 180 deletions(-) copy flink-core/src/main/java/org/apache/flink/configuration/{WritableConfig.java => DescribedEnum.java} (50%)
[flink-benchmarks] branch master updated: [FLINK-23208] Add a benchmark for processing timers
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git The following commit(s) were added to refs/heads/master by this push: new 12b87f8 [FLINK-23208] Add a benchmark for processing timers 12b87f8 is described below commit 12b87f86903e1d2268004cd4f295ddf8c1d33f0a Author: Jiayi Liao AuthorDate: Wed Jul 21 11:11:45 2021 +0800 [FLINK-23208] Add a benchmark for processing timers --- .../flink/benchmark/ProcessingTimerBenchmark.java | 123 + 1 file changed, 123 insertions(+) diff --git a/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java b/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java new file mode 100644 index 000..dee911e --- /dev/null +++ b/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.benchmark; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.util.Collector; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.Random; + +@OperationsPerInvocation(value = ProcessingTimerBenchmark.PROCESSING_TIMERS_PER_INVOCATION) +public class ProcessingTimerBenchmark extends BenchmarkBase { + +public static final int PROCESSING_TIMERS_PER_INVOCATION = 1_000; + +private static final int PARALLELISM = 1; + +private static OneShotLatch LATCH = new OneShotLatch(); + +public static void main(String[] args) +throws RunnerException { +Options options = new OptionsBuilder() +.verbosity(VerboseMode.NORMAL) +.include(".*" + ProcessingTimerBenchmark.class.getCanonicalName() + ".*") +.build(); + +new Runner(options).run(); +} + +@Benchmark +public void fireProcessingTimers(FlinkEnvironmentContext context) throws Exception { +LATCH.reset(); +StreamExecutionEnvironment env = context.env; +env.setParallelism(PARALLELISM); + +env.addSource(new SingleRecordSource()) +.keyBy(String::hashCode) +.process(new ProcessingTimerKeyedProcessFunction(PROCESSING_TIMERS_PER_INVOCATION)) +.addSink(new DiscardingSink<>()); + +env.execute(); +} + +private static class SingleRecordSource extends RichParallelSourceFunction { + +private Random random; + +public SingleRecordSource() {} + +@Override +public void open(Configuration parameters) throws Exception { +this.random = new Random(); +} + +@Override +public void run(SourceContext sourceContext) throws Exception { +synchronized (sourceContext.getCheckpointLock()) { +sourceContext.collect(String.valueOf(random.nextLong())); +} + +LATCH.await(); +} + +@Override +public void cancel() {} +} + +private static class ProcessingTimerKeyedProcessFunction extends KeyedProcessFunction { + +private final long timersPerRecord; +private long firedTimesCount; + +public ProcessingTimerKeyedProcessFunction(long timersPerRecord) { +this.timersPerRecord = timersPerRecord; +} + +@Override +public void open(Configuration parameters) throws Exception { +this.firedTimesCount = 0; +} + +@Override +public
[flink] branch master updated: [FLINK-20562][sql-client][table] Support explain details for EXPLAIN statement
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3d1c11e [FLINK-20562][sql-client][table] Support explain details for EXPLAIN statement 3d1c11e is described below commit 3d1c11e433f28817f1bb70d5ed028c81db705f27 Author: zhaown <51357674+chao...@users.noreply.github.com> AuthorDate: Wed Jul 21 18:41:14 2021 +0800 [FLINK-20562][sql-client][table] Support explain details for EXPLAIN statement This closes #15317 --- .../flink/table/client/cli/CliClientITCase.java| 7 +- .../src/test/resources/sql/table.q | 238 + .../src/main/codegen/data/Parser.tdd | 10 +- .../src/main/codegen/includes/parserImpls.ftl | 36 +++- .../flink/sql/parser/dql/SqlRichExplain.java | 22 +- .../flink/sql/parser/utils/ParserResource.java | 3 + .../ParserResource.properties | 1 + .../flink/sql/parser/FlinkSqlParserImplTest.java | 60 +- .../table/api/internal/TableEnvironmentImpl.java | 8 +- .../flink/table/operations/ExplainOperation.java | 29 ++- .../operations/SqlToOperationConverter.java| 7 +- .../operations/SqlToOperationConverterTest.java| 34 +++ .../testExecuteSqlWithExplainDetailsAndUnion.out | 55 + .../testExecuteSqlWithExplainDetailsInsert.out | 70 ++ .../testExecuteSqlWithExplainDetailsSelect.out | 45 .../flink/table/api/TableEnvironmentTest.scala | 122 --- 16 files changed, 703 insertions(+), 44 deletions(-) diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java index be8d277..e576d09 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java @@ -302,13 +302,18 @@ public class CliClientITCase extends AbstractTestBase { out.append(sqlScript.comment).append(sqlScript.sql); if (i < results.size()) { Result result = results.get(i); - out.append(result.content).append(result.highestTag.tag).append("\n"); +String content = removeStreamNodeId(result.content); +out.append(content).append(result.highestTag.tag).append("\n"); } } return out.toString(); } +private static String removeStreamNodeId(String s) { +return s.replaceAll("\"id\" : \\d+", "\"id\" : "); +} + private static final class Result { final String content; final Tag highestTag; diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q index 157808e..31797f3 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/table.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q @@ -447,3 +447,241 @@ Sink(table=[default_catalog.default_database.orders2], fields=[user, product, am +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) !ok + +# test explain insert with json format +explain json_execution_plan insert into orders2 select `user`, product, amount, ts from orders; +== Abstract Syntax Tree == +LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts]) ++- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3]) + +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, orders]]) + +== Optimized Physical Plan == +Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts]) ++- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +== Optimized Execution Plan == +Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts]) ++- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts]) + +== Physical Execution Plan == +{ + "nodes" : [ { +"id" : , +"type" : "Source: TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])", +"pact" : "Data Source", +"contents" : "Source: TableSourceScan(table=[[default_catalog, default_database,
[flink] branch release-1.12 updated (da62d0d -> 600ce81)
This is an automated email from the ASF dual-hosted git repository. yuanmei pushed a change to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git. from da62d0d [FLINK-20975][hive][tests] Allow integral partition filter pushdown add 600ce81 [FLINK-23419][streaming] Fixed the condition in testNextFirstCheckpointBarrierOvertakesCancellationBarrier in order to make the test more stable (#16534) No new revisions were added by this update. Summary of changes: .../runtime/io/CheckpointBarrierTrackerTest.java | 30 -- 1 file changed, 22 insertions(+), 8 deletions(-)
[flink-shaded] tag release-14.0 created (now 4782c96)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to tag release-14.0 in repository https://gitbox.apache.org/repos/asf/flink-shaded.git. at 4782c96 (commit) No new revisions were added by this update.
svn commit: r48919 - /dev/flink/flink-shaded-14.0-rc1/ /release/flink/flink-shaded-14.0/
Author: chesnay Date: Wed Jul 21 07:26:16 2021 New Revision: 48919 Log: Release Flink-shaded 14.0 Added: release/flink/flink-shaded-14.0/ - copied from r48918, dev/flink/flink-shaded-14.0-rc1/ Removed: dev/flink/flink-shaded-14.0-rc1/
[flink] branch master updated (ae136d1 -> ed39fb2)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ae136d1 [FLINK-23186][docs] improve installation page add ed39fb2 [FLINK-23395] Bump Okhttp to 3.14.9 No new revisions were added by this update. Summary of changes: .../flink-end-to-end-tests-common/pom.xml | 1 - .../flink-metrics-reporter-prometheus-test/pom.xml | 1 - flink-kubernetes/pom.xml | 18 +++--- flink-kubernetes/src/main/resources/META-INF/NOTICE| 6 +++--- flink-metrics/flink-metrics-datadog/pom.xml| 10 +- .../src/main/resources/META-INF/NOTICE | 4 ++-- .../src/main/resources/META-INF/NOTICE | 4 ++-- pom.xml| 12 8 files changed, 35 insertions(+), 21 deletions(-)