[jira] [Created] (FLINK-23005) Optimize the deployment of tasks
Zhilong Hong created FLINK-23005: Summary: Optimize the deployment of tasks Key: FLINK-23005 URL: https://issues.apache.org/jira/browse/FLINK-23005 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhilong Hong Fix For: 1.14.0 h3. Introduction The optimizations introduced in FLINK-21110 so far have improved the performance of job initialization, failover and partitions releasing. However, the task deployment is still slow. For a job with two vertices, each vertex has 8k parallelism and they are connected with the all-to-all edge. It takes 32.611s to deploy all the tasks and make them transition to running. If the parallelisms are 16k, it may take more than 2 minutes. As the creation of TaskDeploymentDescriptors runs in the main thread of jobmanager, it means that the jobmanager cannot deal with other akka messages like heartbeats, task status update, and etc., for more than two minutes. All in all, currently there are two issues in the deployment of tasks for large scale jobs: # It takes a long time to deploy tasks, especially for all-to-all edges. # Heartbeat timeout may happen during or after the procedure of task deployments. For the streaming job, it would cause the failover of the entire region. The job may never transition to running since there would be another heartbeat timeout during the procedure of new task deployments. h3. Proposal Task deployments involves the following procedures: # Jobmanager creates TaskDeploymentDescriptor for each task in the main thread # TaskDeploymentDescriptor is serialized in the future executor # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call # TaskExecutors create a new task thread and execute it The optimization contains two parts: *1. Cache the compressed serialized value of ShuffleDescriptors* ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the IntermediateResultPartitions that a task consumes. For the downstream vertices connected with the all-to-all edge that has _N_ parallelism, we need to calculate _N_ ShuffleDescriptors for _N_ times. However, for these vertices, they share the same ShuffleDescriptors since they all consume the same IntermediateResultPartitions. We don't need to calculate ShuffleDescriptors for each downstream vertex individually. We can just cache them. This will decrease the overall complexity of calculating TaskDeploymentDescriptors from O(N^2) to O(N). Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ times, so we can just cache the serialized value of ShuffleDescriptors instead of the original object. To decrease the size of akka messages and reduce replicated data over the network, these serialized value can be compressed. *2. Distribute the ShuffleDescriptors via blob server* For ShuffleDescriptors of vertices with 8k parallelism, the size of their serialized value is more than 700 Kilobytes. After the compression, it would be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would become a heavy burden for the garbage collector to deal with. In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed via the blob server if their sizes exceed a certain threshold (which is defined as {{blob.offload.minsize}}). TaskExecutors request the information from the blob server once they begin to process the TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep all the copies in the heap memory until the TaskDeploymentDescriptors are all sent. There will be only one copy on the blob server. Like the JobInformation, we can just distribute the cached ShuffleDescriptors via the blob server if their overall size has exceeded the threshold. h3. Summary In summary, the optimization of task deployment is to introduce a cache for the TaskDeploymentDescriptor. We cache the compressed serialized value of ShuffleDescriptors. If the size of the value exceeds a certain threshold, the value would be distributed via the blob server. h3. Comparison We implemented a POC and conducted an experiment to compare the performance of our optimization. We choose the streaming job in the experiment because no task will be running until all tasks are deployed. This avoids other disturbing factors. The job contains two vertices: a source and a sink. They are connected with an all-to-all edge. The results illustrated below are the time interval between the timestamp of the first task that transitions to _deploying_ and the timestamp of the last task that transitions to _running_: ||Parallelism||Before||After || |8000*8000|32.611s|6.480s|
[jira] [Created] (FLINK-23004) Fix misleading log
Junfan Zhang created FLINK-23004: Summary: Fix misleading log Key: FLINK-23004 URL: https://issues.apache.org/jira/browse/FLINK-23004 Project: Flink Issue Type: Improvement Reporter: Junfan Zhang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23003) resource leak in RocksIncrementalSnapshotStrategy
Yanfei Lei created FLINK-23003: -- Summary: resource leak in RocksIncrementalSnapshotStrategy Key: FLINK-23003 URL: https://issues.apache.org/jira/browse/FLINK-23003 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Affects Versions: 1.14.0 Environment: Flink: 1.14-SNAPSHOT Reporter: Yanfei Lei Fix For: 1.14.0 We found that `RocksDBStateUploader` in `RocksIncrementalSnapshotStrategy` is not closed correctly after being used. It would lead to a resource leak. `RocksDBStateUploader` inherits `RocksDBStateDataTransfer`, and `RocksDBStateDataTransfer` holds an `ExecutorService`. `RocksDBStateUploader` uses the `ExecutorService` to upload files to DFS asynchronously. When `RocksDBKeyedStateBackend` is cleaned up, all resources held by the backend should be closed, but now `RocksIncrementalSnapshotStrategy` lacks a close() function. And we encountered an example caused by this problem. When we benchmarked the performance of incremental rescaling, we observed that the forked VM of JMH can't exit normally. {code:java} [INFO] [INFO] --- exec-maven-plugin:1.6.0:exec (default-cli) @ benchmark --- # JMH version: 1.19 # VM version: JDK 1.8.0_281, VM 25.281-b09 # VM invoker: /home/leiyanfei.lyf/jdk1.8.0_281/jre/bin/java # VM options: -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.ssl # Warmup: 10 iterations, 1 s each # Measurement: 10 iterations, 1 s each # Timeout: 10 min per iteration # Threads: 1 thread, will synchronize iterations # Benchmark mode: Average time, time/op # Benchmark: org.apache.flink.state.RocksIncrementalCheckpointScaleUpBenchmark.ScalingUp # Parameters: (numberElements = 100, parallelism1 = 2, parallelism2 = 3)# Run progress: 0.00% complete, ETA 00:02:00 # Fork: 1 of 3 # Warmup Iteration 1: 244.717 ms/op # Warmup Iteration 2: 104.749 ms/op # Warmup Iteration 3: 104.182 ms/op ... Iteration 1: 96.600 ms/op Iteration 2: 108.463 ms/op Iteration 3: 93.657 ms/op ...Non-finished threads: ... Thread[pool-15-thread-4,5,main] at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} The root cause of this example is that the `{{RocksDBStateUploader}}` in `{{RocksIncrementalSnapshotStrategy`}} is not closed normally when `{{RocksDBKeyedStateBackend`}} is disposed. The solution to this problem is quite straightforward, `{{RocksDBStateUploader`}} in `{{RocksIncrementalSnapshotStrategy}}` can be closed when cleaning up `{{RocksDBKeyedStateBackend}}`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] Moving to JUnit5
Sorry for following up so late. A while ago, I spiked a junit 5 migration. To recap: here is the migration plan. 0. (There is a way to use JUnit4 + 5 at the same time in a project - you'd > use a specific JUnit4 runner to execute JUnit5. I'd like to skip this > step as it would slow down migration significantly) > 1. Use JUnit5 with vintage runner. JUnit4 tests run mostly out of the > box. The most important difference is that only 3 base rules are supported > and the remainder needs to be migrated. Luckily, most of our rules derive > from the supported ExternalResource. So in this step, we would need to > migrate the rules. > 2. Implement new tests in JUnit5. > 3. Soft-migrate old tests in JUnit5. This is mostly a renaming of > annotation (@Before -> @BeforeEach, etc.). Adjust parameterized tests > (~400), replace rule usages (~670) with extensions, exception handling > (~1600 tests), and timeouts (~200). This can be done on a test class by > test class base and there is no hurry. > 4. Remove vintage runner, once most tests are migrated by doing a final > push for lesser used modules. > Here are my insights: 0. works but I don't see the benefit 1. works well [1] with a small diff [2]. Note that the branch is based on a quite old master. 2. works well as well [3]. 2a. However, we should be aware that we need to port quite a few rules to extensions before we can implement more complex JUnit5 tests, especially ITCases (I'd probably skip junit-jupiter-migrationsupport that allows us to reuse _some_ rules using specific base classes). We have ~10-15 rules that need to be ported. 3. Soft migration will take forever and probably never finish. Many tests can be automatically ported with some (I used 8) simple regexes. I'd rather do a hard migration of all tests at a particular point (no freeze) and have that git commit excluded from blame, similar to the spotless commit. 3a. A huge chunk of changes (>90%) comes from the optional message in assertX being moved from the first to the last position. @Chesnay Schepler proposed to rather implement our own Assertion class in the old junit package that translates it. But this would need to go hand in hand with 4) to avoid name clash. It could also just be a temporary thing that we use during hard migration and then inline before merging. 4. If we do hard migration, we should probably do that in a follow-up PR (contains just the migrations of the tests that have been added in the meantime). Here is my time-limited take on the hard migration [4]. It was a matter of ~10h. [1] https://dev.azure.com/arvidheise0209/arvidheise/_build/results?buildId=1208=ms.vss-test-web.build-test-results-tab=24838=10=debug [2] https://github.com/AHeise/flink/commit/7f3e7faac9ba53615bda89e51d5fd17d940c4a55 [3] https://github.com/AHeise/flink/commit/c0dd3d12fbd07b327b560107396ee0bb1e2d8969 [4] https://github.com/apache/flink/compare/master...AHeise:junit5?expand=1 On Tue, Dec 1, 2020 at 9:54 AM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > +1 for the migration > > (I agree with Dawid, for me the most important benefit is better support of > parameterized tests). > > Regards, > Roman > > > On Mon, Nov 30, 2020 at 9:42 PM Arvid Heise wrote: > > > Hi Till, > > > > immediate benefit would be mostly nested tests for a better test > structure > > and new parameterized tests for less clutter (often test functionality is > > split into parameterized test and non-parameterized test because of > JUnit4 > > limitation). Additionally, having Java8 lambdas to perform fine-grain > > exception handling would make all related tests more readable (@Test only > > allows one exception per test method, while in reality we often have more > > exceptions / more fine grain assertions and need to resort to try-catch > -- > > yuck!). The extension mechanism would also make the mini cluster much > > easier to use: we often have to start the cluster manually because of > > test-specific configuration, which can be easily avoided in JUnit5. > > > > In the medium and long-term, I'd also like to use the modular > > infrastructure and improved parallelization. The former would allow us > > better to implement cross-cutting features like TestLogger (why do we > need > > to extend that manually in every test?). The latter is more relevant for > > the next push on CI, which would be especially interesting with e2e being > > available in Java. > > > > On Mon, Nov 30, 2020 at 2:07 PM Dawid Wysakowicz > > > wrote: > > > > > Hi all, > > > > > > Just wanted to express my support for the idea. I did miss certain > > > features of JUnit 5 already, an important one being much better support > > > for parameterized tests. > > > > > > Best, > > > > > > Dawid > > > > > > On 30/11/2020 13:50, Arvid Heise wrote: > > > > Hi Chesnay, > > > > > > > > The vintage runner supports the old annotations, so we don't have to > > > change > > > > them in the first step. > > > > > > > > The only thing that we need to change
Re: [DISCUSS] FLIP-171: Async Sink
Hey Steffen, I have a few questions regarding the FLIP: 1. Where do you expect the core code to live, would it be in an existing module (say flink-clients) or would you introduce a new module? 2. Which destination implementations do you intend to ship with this FLIP? I see an example with Kinesis but you also list a bunch of other candidates. 3. For the Kinesis implementation, would you add the Sink to the existing flink-connector-kinesis repo, or create a new module? Reason I ask is that the existing Kinesis Sink depends on KPL and has a heavy transitive dependency chain, removing this would substantially reduce application size and clean the dependency chain Thanks, On 10/06/2021, 09:09, "Hausmann, Steffen" wrote: CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hey Piotrek, Thanks for your comments on the FLIP. I'll address your second question first, as I think it's more central to this FLIP. Just looking at the AWS ecosystem, there are several sinks with overlapping functionality. I've chosen AWS sinks here because I'm most familiar with those, but a similar argument applies more generically for destination that support async ingest. There is, for instance, a sink for Amazon Kinesis Data Streams that is part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose [2], a sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4]. All these sinks have implemented their own mechanisms for batching, persisting, and retrying events. And I'm not sure if all of them properly participate in checkpointing. [3] even seems to closely mirror [1] as it contains references to the Kinesis Producer Library, which is unrelated to Amazon DynamoDB. These sinks predate FLIP-143. But as batching, persisting, and retrying capabilities do not seem to be part of FLIP-143, I'd argue that we would end up with similar duplication, even if these sinks were rewritten today based on FLIP-143. And that's the idea of FLIP-171: abstract away these commonly required capabilities so that it becomes easy to create support for a wide range of destination without having to think about batching, retries, checkpointing, etc. I've included an example in the FLIP [5] that shows that it only takes a couple of lines of code to implement a sink with exactly-once semantics. To be fair, the example is lacking robust failure handling and some more advanced capabilities of [1], but I think it still supports this point. Regarding your point on the isAvailable pattern. We need some way for the sink to propagate backpressure and we would also like to support time based buffering hints. There are two options I currently see and would need additional input on which one is the better or more desirable one. The first option is to use the non-blocking isAvailable pattern. Internally, the sink persists buffered events in the snapshot state which avoids having to flush buffered record on a checkpoint. This seems to align well with the non-blocking isAvailable pattern. The second option is to make calls to `write` blocking and leverage an internal thread to trigger flushes based on time based buffering hints. We've discussed these options with Arvid and suggested to assumed that the `isAvailable` pattern will become available for sinks through and additional FLIP. I think it is an important discussion to have. My understanding of the implications for Flink in general are very naïve, so I'd be happy to get further guidance. However, I don't want to make this discussion part of FLIP-171. For FLIP-171 we'll use whatever is available. Does that make sense? Cheers, Steffen [1] https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis [2] https://github.com/aws/aws-kinesisanalytics-flink-connectors [3] https://github.com/klarna-incubator/flink-connector-dynamodb [4] https://github.com/awslabs/amazon-timestream-tools/ [5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams On 09.06.21, 19:44, "Piotr Nowojski" wrote: CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi Steffen, Thanks for writing down the proposal. Back when the new Sink API was being discussed, I was proposing to add our usual `CompletableFuture isAvailable()` pattern to make sinks non-blocking. You can see the discussion starting here [1], and continuing for a couple of more posts until here [2]. Back then, the outcome was that it would give very little benefit, at the expense of making the API more complicated. Could you maybe relate your
[jira] [Created] (FLINK-23002) C# SDK for Stateful Functions
Konstantin Knauf created FLINK-23002: Summary: C# SDK for Stateful Functions Key: FLINK-23002 URL: https://issues.apache.org/jira/browse/FLINK-23002 Project: Flink Issue Type: New Feature Components: Build System / Stateful Functions Reporter: Konstantin Knauf -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23001) flink-avro-glue-schema-registry lacks scala suffix
Chesnay Schepler created FLINK-23001: Summary: flink-avro-glue-schema-registry lacks scala suffix Key: FLINK-23001 URL: https://issues.apache.org/jira/browse/FLINK-23001 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.13.0 Reporter: Chesnay Schepler Fix For: 1.14.0 The dependency on flink-streaming-java implies a need for a scala suffix. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-129 (Update): Registering sources/sinks on Table API without SQL
Thanks Ingo for starting this discussion. Big +1 for this feature. Looking forward to the feature. Best regards, JING ZHANG Jark Wu 于2021年6月15日周二 下午3:48写道: > Thanks Ingo for picking up this FLIP. > > FLIP-129 is an important piece to have a complete Table SQL story, > and users have been waiting for a long time. Let's finish it in this > release! > Your proposed changes look good to me. > > I also cc'd people who voted in previous FLIP-129. > > Best, > Jark > > On Thu, 10 Jun 2021 at 19:46, Ingo Bürk wrote: > > > Hello everyone, > > > > we would like to pick up work on FLIP-129 which aims to improve the Table > > API by supporting the creation of sources / sinks without having to go > > through SQL/DDL. This FLIP was approved a while ago, and some things have > > changed since then. We'd like to propose a few changes, see [1], before > > starting work on it. > > Our proposal is mainly motivated by reducing the scope in some parts to > > improve maintainability and relying more on ConfigOptions being the > single > > source of truth. We also want to expose this functionality for > > non-temporary tables. > > > > We'd like to open this for discussion to collect any feedback. Once the > > discussion has stabilized I'll update the FLIP itself and start a new > vote. > > > > [1] > > > > > https://docs.google.com/document/d/1tpirvF0u723QF005UrgdbvF-Tp0Jbg_qhlbda4mk7Ck/edit?usp=sharing > > > > > > Regards > > Ingo > > >
[jira] [Created] (FLINK-23000) Allow downloading all Flink logs from the UI
Robert Metzger created FLINK-23000: -- Summary: Allow downloading all Flink logs from the UI Key: FLINK-23000 URL: https://issues.apache.org/jira/browse/FLINK-23000 Project: Flink Issue Type: New Feature Components: Runtime / Web Frontend Reporter: Robert Metzger Assignee: Robert Metzger Fix For: 1.14.0 Users often struggle to provide all relevant logs. By having a button in the Web UI that collects the logs of all TaskManagers and the JobManager will make things easier for the community supporting users. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22999) Using the column 'user' in Table API is not possible
Martijn Visser created FLINK-22999: -- Summary: Using the column 'user' in Table API is not possible Key: FLINK-22999 URL: https://issues.apache.org/jira/browse/FLINK-22999 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Martijn Visser Using the following code: {code:java} final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); final DataStream clickStream = env.fromElements( Row.of("Bob", "ORDER_ENTERED", "1623758400"), Row.of("Alice", "ORDER_ENTERED", "1623758700"), Row.of("Bob", "ADDRESS_ENTERED", "1623759000")); final Table clickTable = tableEnv.fromDataStream(clickStream).as("user","pageType", "ts"); tableEnv.createTemporaryView("ClickStream", clickTable); tableEnv.executeSql("CREATE VIEW FirstPageVisits AS SELECT user, pageType, ts FROM ClickStream WHERE pageType = 'ORDER_ENTERED'"); clickTable.execute().print(); {code} Throws the following stacktrace: {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 8 to line 1, column 11: Column 'USER' not found in any table; did you mean 'user'? at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) at org.apache.flink.table.planner.utils.Expander.expanded(Expander.java:86) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:898) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:868) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) at org.apache.flink.table.examples.java.basics.StreamSQLExample.main(StreamSQLExample.java:69) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 11: Column 'USER' not found in any table; did you mean 'user'? at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:254) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5833) at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:5982) at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:5967) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5416) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:398) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952) at
Re: [VOTE] FLIP-169: DataStream API for Fine-Grained Resource Requirements
+1 (binding) Thank you~ Xintong Song On Tue, Jun 15, 2021 at 6:21 PM Arvid Heise wrote: > LGTM +1 (binding) from my side. > > On Tue, Jun 15, 2021 at 11:00 AM Yangze Guo wrote: > > > Hi everyone, > > > > I'd like to start the vote of FLIP-169 [1]. This FLIP is discussed in > > the thread[2]. > > > > The vote will be open for at least 72 hours. Unless there is an > > objection, I will try to close it by Jun. 18, 2021 if we have received > > sufficient votes. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements > > [2] > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51071.html > > > > Best, > > Yangze Guo > > >
[jira] [Created] (FLINK-22998) Flink SQL does not support block comment before SET command
Zhiwen Sun created FLINK-22998: -- Summary: Flink SQL does not support block comment before SET command Key: FLINK-22998 URL: https://issues.apache.org/jira/browse/FLINK-22998 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.14.0 Reporter: Zhiwen Sun Flink SQL does not support block comment before SET command. A tiny SQL that produces the bug: {code:java} /** comment **/ SET sql-client.execution.result-mode=TABLEAU; SELECT 'hello';{code} while following SQL works fine: {code:java} SET sql-client.execution.result-mode=TABLEAU; /** comment **/ SELECT 'hello';{code} After I debug Flink source code, I found that EXTENDED_PARSER does not support block comment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22997) Replace AkkaUtils#get*Timeout*
Chesnay Schepler created FLINK-22997: Summary: Replace AkkaUtils#get*Timeout* Key: FLINK-22997 URL: https://issues.apache.org/jira/browse/FLINK-22997 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.14.0 The {{AkkaUtils}} contain several methods for accessing timeout related configuration options, such as AkkaOptions#ASK_TIMEOUT or LOOKUP_TIMEOUT, as a Time or Duration. Instead, let's introduce newer versions of these options that accept durations, access the options directly, and add a converting factory method from Duration -> Time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Apply for FLIP Wiki Page Edit Permission
Done. You should now have the access rights. Cheers, Till On Tue, Jun 15, 2021 at 10:18 AM Roc Marshal wrote: > Hi, > > > > > I want to contribute to Flink, but I can't create FLIP wiki page now. > Would someone please give me the confluence permission ? > > My Confluence ID is roc-marshal. Full name is RocMarshal. > > My JIRA ID is RocMarshal. > > Thank you . > > > > > Best, Roc.
Re: [VOTE] FLIP-169: DataStream API for Fine-Grained Resource Requirements
LGTM +1 (binding) from my side. On Tue, Jun 15, 2021 at 11:00 AM Yangze Guo wrote: > Hi everyone, > > I'd like to start the vote of FLIP-169 [1]. This FLIP is discussed in > the thread[2]. > > The vote will be open for at least 72 hours. Unless there is an > objection, I will try to close it by Jun. 18, 2021 if we have received > sufficient votes. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements > [2] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51071.html > > Best, > Yangze Guo >
[jira] [Created] (FLINK-22996) The description about coalesce is wrong
lothar created FLINK-22996: -- Summary: The description about coalesce is wrong Key: FLINK-22996 URL: https://issues.apache.org/jira/browse/FLINK-22996 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.13.0 Reporter: lothar Attachments: image-2021-06-15-17-14-56-270.png In Flink website, there is a fault that the description about coalesce function.The document link is [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions.] !image-2021-06-15-17-14-56-270.png! It should be “{{COALESCE(NULL, 5)}} returns 5”. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS]FLIP-150: Introduce Hybrid Source
Hi Arvid, Thanks for your reply --> On Mon, Jun 14, 2021 at 2:55 PM Arvid Heise wrote: > > Hi Thomas, > > Thanks for bringing this up. I think this is a tough nut to crack :/. > Imho 1 and 3 or 1+3 can work but it is ofc a pity if the source implementor > is not aware of HybridSource. I'm also worried that we may not have a > universal interface to specify start offset/time. > I guess it also would be much easier if we would have an abstract base > source class where we could implement some basic support. > > When I initially looked at the issue I was thinking that sources should > always be immutable (we have some bad experiences with mutable life-cycles > in operator implementations) and the only modifiable thing should be the > builder. That would mean that a HybridSource actually just gets a list of > source builders and creates the sources when needed with the correct > start/end offset. However, we neither have base builders (something that > I'd like to change) nor are any of the builders serializable. We could > convert sources back to builders, update the start offset, and convert to > sources again but this also seems overly complicated. So I'm assuming that > we should go with modifiable sources as also expressed in the FLIP draft. The need to set a start position at runtime indicates that sources should not be immutable. I think it would be better to have a setter on the source that clearly describes the mutation. Regarding deferred construction of the sources (supplier pattern): This is actually a very interesting idea that would also help in situations where the exact sequence of sources isn't known upfront. However, Source is also the factory for split and enumerator checkpoint serializers. If we were to instantiate the source at switch time, we would also need to distribute the serializers at switch time. This would lead to even more complexity and move us further away from the original goal of having a relatively simple implementation for the basic scenarios. > If we could assume that we are always switching by time, we could also > change Source(Enumerator)#start to take the start time as a parameter. Can > we deduce the end time by the record timestamp? But I guess that has all > been discussed already, so sorry if I derail the discussion. This actually hasn't been discussed. The original proposal left the type of the start position open, which also makes it less attractive (user still has to supply a converter). For initial internal usage of the hybrid source, we are planning to use a timestamp. But there may be use cases where the start position could be encoded in other ways, such as based on Kafka offsets. > I'm also leaning towards extending the Source interface to include these > methods (with defaults) to make it harder for implementers to miss. It would be possible to introduce an optional interface as a follow-up task. It can be implemented as the default of option 3. > > > On Fri, Jun 11, 2021 at 7:02 PM Thomas Weise wrote: > > > Thanks for the suggestions and feedback on the PR. > > > > A variation of hybrid source that can switch back and forth was > > brought up before and it is something that will be eventually > > required. It was also suggested by Stephan that in the future there > > may be more than one implementation of hybrid source for different > > requirements. > > > > I want to bring back the topic of how enumerator end state can be > > converted into start position from the PR [1]. We started in the FLIP > > page with "switchable" interfaces, the prototype had checkpoint > > conversion and now the PR has a function that allows to augment the > > source. Each of these has pros and cons but we will need to converge. > > > > 1. Switchable interfaces > > * unified solution > > * requires sources to implement a special interface to participate in > > HybridSource, even when no dynamic conversion is needed > > > > 2. Checkpoint state > > * unified solution > > * no interface changes > > * requires implementation change to existing enumerators to include > > end state (like a timestamp) into their checkpoint state > > * existing sources work as is for fixed start position > > > > 3. Source modification at switch time to set start position > > * can be solved per source, least restrictive > > * no interface changes > > * requires enumerator to expose end state (as a getter) and source to > > be either mutable or source to be copied and augmented with the start > > position. > > * existing sources work as is for fixed start position > > > > I think more eyes might help to finalize the approach. > > > > [1] https://github.com/apache/flink/pull/15924#discussion_r649929865 > > > > On Mon, Jun 7, 2021 at 11:18 PM Steven Wu wrote: > > > > > > > hybrid sounds to me more like the source would constantly switch back > > and forth > > > > > > Initially, the focus of hybrid source is more like a sequenced chain. > > > > > > But in the future it would be cool that
Re: Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements
Thanks for the supplement, Arvid and Yun. I've annotated these two points in the FLIP. The vote is now started in [1]. [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51381.html Best, Yangze Guo On Fri, Jun 11, 2021 at 2:50 PM Yun Gao wrote: > > Hi, > > Very thanks @Yangze for bringing up this discuss. Overall +1 for > exposing the fine-grained resource requirements in the DataStream API. > > One similar issue as Arvid has pointed out is that users may also creating > different SlotSharingGroup objects, with different names but with different > resources. We might need to do some check internally. But We could also > leave that during the development of the actual PR. > > Best, > Yun > > > > --Original Mail -- > Sender:Arvid Heise > Send Date:Thu Jun 10 15:33:37 2021 > Recipients:dev > Subject:Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource > Requirements > Hi Yangze, > > > > Thanks for incorporating the ideas and sorry for missing the builder part. > > My main idea is that SlotSharingGroup is immutable, such that the user > > doesn't do: > > > > ssg = new SlotSharingGroup(); > > ssg.setCpus(2); > > operator1.slotSharingGroup(ssg); > > ssg.setCpus(4); > > operator2.slotSharingGroup(ssg); > > > > and wonders why both operators have the same CPU spec. But the details can > > be fleshed out in the actual PR. > > > > On Thu, Jun 10, 2021 at 5:13 AM Yangze Guo wrote: > > > > > Thanks all for the discussion. I've updated the FLIP accordingly, the > > > key changes are: > > > - Introduce SlotSharingGroup instead of ResourceSpec which contains > > > the resource spec of slot sharing group > > > - Introduce two interfaces for specifying the SlotSharingGroup: > > > #slotSharingGroup(SlotSharingGroup) and > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup). > > > > > > If there is no more feedback, I'd start a vote next week. > > > > > > Best, > > > Yangze Guo > > > > > > On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo wrote: > > > > > > > > Thanks for the valuable suggestion, Arvid. > > > > > > > > 1) Yes, we can add a new SlotSharingGroup which includes the name and > > > > its resource. After that, we have two interfaces for configuring the > > > > slot sharing group of an operator: > > > > - #slotSharingGroup(String name) // the resource of it can be > > > > configured through StreamExecutionEnvironment#registerSlotSharingGroup > > > > - #slotSharingGroup(SlotSharingGroup ssg) // Directly configure the > > > resource > > > > And one interface to configure the resource of a SSG: > > > > - StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) > > > > We can also define the priority of the above two approaches, e.g. the > > > > resource registering in the StreamExecutionEnvironment will always be > > > > respected when conflict. That would be well documented. > > > > > > > > 2) Yes, I originally add this interface as a shortcut. It seems > > > > unnecessary now. Will remove it. > > > > > > > > 3) I don't think we need to expose the ExternalResource. In the > > > > builder of SlotSharingGroup, we can introduce a > > > > #withExternalResource(String name, double value). Also, this interface > > > > needs to be annotated as evolving. > > > > > > > > 4) Actually, I've mentioned it in the FLIP. Maybe it would be good to > > > > elaborate on the Builder for the SlotSharingGroup. > > > > > > > > WDYT? > > > > > > > > Best, > > > > Yangze Guo > > > > > > > > On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise wrote: > > > > > > > > > > Hi Yangze, > > > > > > > > > > I like the general approach to bind requirements to slotsharing > > > groups. I > > > > > think the current approach is also flexible enough that a user could > > > simply > > > > > use ParameterTool or similar to use config values and wire that with > > > their > > > > > slotgroups, such that different requirements can be tested without > > > > > recompilation. So I don't see an immediate need to provide a generic > > > > > solution for yaml configuration for now. > > > > > > > > > > Looking at the programmatic interface though, I think we could improve > > > by > > > > > quite a bit and I haven't seen these alternatives being considered in > > > the > > > > > FLIP: > > > > > 1) Add new class SlotSharingGroup that incorporates all ResourceSpec > > > > > properties. Instead of using group names, the user could directly > > > configure > > > > > such an object. > > > > > > > > > > SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name > > > > > could also be omitted and auto-generated > > > > > ssg1.setCPUCores(4); > > > > > ... > > > > > DataStream> grades = > > > > > GradeSource > > > > > .getSource(env, rate) > > > > > > > > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()) > > > > > .slotSharingGroup(ssg1); > > > > > DataStream> salaries = > > > > >
[VOTE] FLIP-169: DataStream API for Fine-Grained Resource Requirements
Hi everyone, I'd like to start the vote of FLIP-169 [1]. This FLIP is discussed in the thread[2]. The vote will be open for at least 72 hours. Unless there is an objection, I will try to close it by Jun. 18, 2021 if we have received sufficient votes. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements [2] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51071.html Best, Yangze Guo
Apply for FLIP Wiki Page Edit Permission
Hi, I want to contribute to Flink, but I can't create FLIP wiki page now. Would someone please give me the confluence permission ? My Confluence ID is roc-marshal. Full name is RocMarshal. My JIRA ID is RocMarshal. Thank you . Best, Roc.
[jira] [Created] (FLINK-22995) Failed to acquire lease 'ConfigMapLock: .... retrying...
Bhagi created FLINK-22995: - Summary: Failed to acquire lease 'ConfigMapLock: retrying... Key: FLINK-22995 URL: https://issues.apache.org/jira/browse/FLINK-22995 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes Affects Versions: 1.13.1 Reporter: Bhagi Attachments: jobamanger.log Hi Team, I have deployed Flink session cluster on standalone k8s with Jobmanager HA (k8s HA service). when i am submitting the jobs all jobs are failed. due to jobmanager leader election issue. attaching the logs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-129 (Update): Registering sources/sinks on Table API without SQL
Thanks Ingo for picking up this FLIP. FLIP-129 is an important piece to have a complete Table SQL story, and users have been waiting for a long time. Let's finish it in this release! Your proposed changes look good to me. I also cc'd people who voted in previous FLIP-129. Best, Jark On Thu, 10 Jun 2021 at 19:46, Ingo Bürk wrote: > Hello everyone, > > we would like to pick up work on FLIP-129 which aims to improve the Table > API by supporting the creation of sources / sinks without having to go > through SQL/DDL. This FLIP was approved a while ago, and some things have > changed since then. We'd like to propose a few changes, see [1], before > starting work on it. > Our proposal is mainly motivated by reducing the scope in some parts to > improve maintainability and relying more on ConfigOptions being the single > source of truth. We also want to expose this functionality for > non-temporary tables. > > We'd like to open this for discussion to collect any feedback. Once the > discussion has stabilized I'll update the FLIP itself and start a new vote. > > [1] > > https://docs.google.com/document/d/1tpirvF0u723QF005UrgdbvF-Tp0Jbg_qhlbda4mk7Ck/edit?usp=sharing > > > Regards > Ingo >
[jira] [Created] (FLINK-22994) Improve the performance of invoking nesting udf
lynn1.zhang created FLINK-22994: --- Summary: Improve the performance of invoking nesting udf Key: FLINK-22994 URL: https://issues.apache.org/jira/browse/FLINK-22994 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.12.4 Environment: h5. Reporter: lynn1.zhang Attachments: image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png h1. BackGround Flink maintain the udf result as binary, like BinaryStringData. When invoking nesting udf like select useless(int_ip_2_string(ip)), the result of int_ip_2_string(ip) will be serialization and deserialization. Below is the Generated Code !image-2021-06-15-15-18-12-619.png! this issue want to improve it as below !image-2021-06-15-15-19-01-103.png! h1. Performance Compare Condition: Source = Kafka,Schema = PB with snappy;Flink Slot = 1、taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU @ 2.30GHz UDF Introduction: * ipip: input: int ip,output: map ip_info,map size = 14。 * ip_2_country: input map ip_info,output: string country。 * ip_2_region: input map ip_info,output: string region。 * ip_2_isp_domain: input map ip_info,output: string isp。 * ip_2_timezone: input map ip_info,output: string timezone。 h5. The throughput without udf invoke: 764.50 w/s !image-2021-06-15-15-27-26-739.png! h5. The throughput with udf invoke: 183.24 k/s !image-2021-06-15-15-28-28-137.png! h5. The throughput with udf nesting invoke: 41.42 k/s !image-2021-06-15-15-29-09-773.png! h5. The throughput with udf nesting invoke after the issue: 174.41 k/s !image-2021-06-15-15-30-14-775.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22993) CompactFileWriter won't emit EndCheckpoint with Long.MAX_VALUE checkpointId
luoyuxia created FLINK-22993: Summary: CompactFileWriter won't emit EndCheckpoint with Long.MAX_VALUE checkpointId Key: FLINK-22993 URL: https://issues.apache.org/jira/browse/FLINK-22993 Project: Flink Issue Type: Bug Components: Connectors / FileSystem, Connectors / Hive Affects Versions: 1.13.0 Reporter: luoyuxia Fix For: 1.14.0 CompactFileWriter won't emit EndCheckpoint with Long.MAX_VALUE checkpointId even though the inputs end. It will cause data lose. For example, after completing checkpoint 5, CompactFileWriter write some file, then the inputs end, but it won't emit EndCheckpoint with Long.MAX_VALUE checkpointId, so the downstream operator won't do the compaction for the files which means these files are invisible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22992) Move SSLUtils#is*Enabled to SecurityOptions
Chesnay Schepler created FLINK-22992: Summary: Move SSLUtils#is*Enabled to SecurityOptions Key: FLINK-22992 URL: https://issues.apache.org/jira/browse/FLINK-22992 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.14.0 The SSLUtils contain a few methods that check whether certain SSL options are enabled while honoring a fallback "security.ssl.enabled" option. These are required by RPC, network and REST components. In order to be able to move our akka rpc stuff into a separate module it is necessary to move these to flink-core. (There's also no good reason for them to be in flink-runtime in the first place) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22991) Use Optional in NetworkBufferPool
Chesnay Schepler created FLINK-22991: Summary: Use Optional in NetworkBufferPool Key: FLINK-22991 URL: https://issues.apache.org/jira/browse/FLINK-22991 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22990) Remove unused ScalaUtils/TestingMessages
Chesnay Schepler created FLINK-22990: Summary: Remove unused ScalaUtils/TestingMessages Key: FLINK-22990 URL: https://issues.apache.org/jira/browse/FLINK-22990 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22989) Port Tasks to Java
Chesnay Schepler created FLINK-22989: Summary: Port Tasks to Java Key: FLINK-22989 URL: https://issues.apache.org/jira/browse/FLINK-22989 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22988) Port TestingUtils to Java
Chesnay Schepler created FLINK-22988: Summary: Port TestingUtils to Java Key: FLINK-22988 URL: https://issues.apache.org/jira/browse/FLINK-22988 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22987) Scala suffix check isn't working
Chesnay Schepler created FLINK-22987: Summary: Scala suffix check isn't working Key: FLINK-22987 URL: https://issues.apache.org/jira/browse/FLINK-22987 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.12.4 Reporter: Chesnay Schepler Fix For: 1.14.0, 1.12.5, 1.13.2 The scala suffix check (tools/ci/verify_scala_suffixes.sh) isn't working because: * /dev/tty isn't available * the module names introduced in FLINK-18607 broke the parsing. -- This message was sent by Atlassian Jira (v8.3.4#803005)