[jira] [Created] (FLINK-30072) Cannot assign instance of SerializedLambda to field KeyGroupStreamPartitioner.keySelector
Nico Kruber created FLINK-30072: --- Summary: Cannot assign instance of SerializedLambda to field KeyGroupStreamPartitioner.keySelector Key: FLINK-30072 URL: https://issues.apache.org/jira/browse/FLINK-30072 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.16.0 Reporter: Nico Kruber In application mode, if the {{usrlib}} directories of the JM and TM differ, e.g. same jars but different names, the job is failing and throws this cryptic exception on the JM: {code} 2022-11-17 09:55:12,968 INFO org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Restarting job. org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order. at org.apache.flink.streaming.api.graph.StreamConfig.getVertexNonChainedOutputs(StreamConfig.java:537) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1600) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1584) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:408) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:362) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:335) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:327) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:317) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.(SourceOperatorStreamTask.java:84) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at jdk.internal.reflect.GeneratedConstructorAccessor38.newInstance(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?] at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?] at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1589) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:714) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at java.lang.Thread.run(Unknown Source) ~[?:?] Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source) ~[?:?] at java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown Source) ~[?:?] at java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at java.util.ArrayList.readObject(Unknown Source) ~[?:?] at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at
[jira] [Created] (FLINK-30045) FromClasspathEntryClassInformationProvider too eager to verify MainClass
Nico Kruber created FLINK-30045: --- Summary: FromClasspathEntryClassInformationProvider too eager to verify MainClass Key: FLINK-30045 URL: https://issues.apache.org/jira/browse/FLINK-30045 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.16.0, 1.17.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.17.0, 1.16.1 {{FromClasspathEntryClassInformationProvider}} is attempting to verify (eagerly) whether the given MainClass is on the user classpath. However, it doesn't handle cases where the main class is inside a nested jar. This is something you would see when using such a nested jar file with the {{StandaloneApplicationClusterEntryPoint}}, e.g. from {{standalone-job.sh}} We actually don't need this check at all since {{PackagedProgram}} is already doing it while attempting to load the main class. Having this once should be enough. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29884) Flaky test failure in finegrained_resource_management/SortMergeResultPartitionTest.testRelease
Nico Kruber created FLINK-29884: --- Summary: Flaky test failure in finegrained_resource_management/SortMergeResultPartitionTest.testRelease Key: FLINK-29884 URL: https://issues.apache.org/jira/browse/FLINK-29884 Project: Flink Issue Type: Bug Components: Runtime / Coordination, Runtime / Network, Tests Affects Versions: 1.17.0 Reporter: Nico Kruber Fix For: 1.17.0 {{SortMergeResultPartitionTest.testRelease}} failed with a timeout in the finegrained_resource_management tests: {code:java} Nov 03 17:28:07 [ERROR] Tests run: 20, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 64.649 s <<< FAILURE! - in org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest Nov 03 17:28:07 [ERROR] SortMergeResultPartitionTest.testRelease Time elapsed: 60.009 s <<< ERROR! Nov 03 17:28:07 org.junit.runners.model.TestTimedOutException: test timed out after 60 seconds Nov 03 17:28:07 at org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest.testRelease(SortMergeResultPartitionTest.java:374) Nov 03 17:28:07 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Nov 03 17:28:07 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) Nov 03 17:28:07 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Nov 03 17:28:07 at java.lang.reflect.Method.invoke(Method.java:498) Nov 03 17:28:07 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) Nov 03 17:28:07 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) Nov 03 17:28:07 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) Nov 03 17:28:07 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) Nov 03 17:28:07 at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) Nov 03 17:28:07 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) Nov 03 17:28:07 at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) Nov 03 17:28:07 at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) Nov 03 17:28:07 at java.util.concurrent.FutureTask.run(FutureTask.java:266) Nov 03 17:28:07 at java.lang.Thread.run(Thread.java:748) {code} [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42806=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29868) Dependency convergence error for org.osgi:org.osgi.core:jar
Nico Kruber created FLINK-29868: --- Summary: Dependency convergence error for org.osgi:org.osgi.core:jar Key: FLINK-29868 URL: https://issues.apache.org/jira/browse/FLINK-29868 Project: Flink Issue Type: Bug Components: Build System, Table SQL / Runtime Affects Versions: 1.17.0 Reporter: Nico Kruber Fix For: 1.17.0 While working on FLINK-29867, the following new error is popping up while running {code} ./mvnw clean install -pl flink-dist -am -DskipTests -Dflink.convergence.phase=install -Pcheck-convergence {code} (this is also done by CI which therefore fails) {code} [WARNING] Dependency convergence error for org.osgi:org.osgi.core:jar:4.3.0:runtime paths to dependency are: +-org.apache.flink:flink-table-planner-loader-bundle:jar:1.17-SNAPSHOT +-org.apache.flink:flink-table-planner_2.12:jar:1.17-SNAPSHOT:runtime +-org.apache.flink:flink-table-api-java-bridge:jar:1.17-SNAPSHOT:runtime +-org.apache.flink:flink-streaming-java:jar:1.17-SNAPSHOT:runtime +-org.apache.flink:flink-runtime:jar:1.17-SNAPSHOT:runtime +-org.xerial.snappy:snappy-java:jar:1.1.8.3:runtime +-org.osgi:org.osgi.core:jar:4.3.0:runtime and +-org.apache.flink:flink-table-planner-loader-bundle:jar:1.17-SNAPSHOT +-org.apache.flink:flink-table-planner_2.12:jar:1.17-SNAPSHOT:runtime +-org.apache.flink:flink-scala_2.12:jar:1.17-SNAPSHOT:runtime +-org.apache.flink:flink-core:jar:1.17-SNAPSHOT:runtime +-org.apache.commons:commons-compress:jar:1.21:runtime +-org.osgi:org.osgi.core:jar:6.0.0:runtime {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29867) Update maven-enforcer-plugin to 3.1.0
Nico Kruber created FLINK-29867: --- Summary: Update maven-enforcer-plugin to 3.1.0 Key: FLINK-29867 URL: https://issues.apache.org/jira/browse/FLINK-29867 Project: Flink Issue Type: Improvement Components: Build System Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.17.0 We currently rely on 3.0.0-M1 but will have to skip 3.0.0 (final) due to MENFORCER-394 which hits Flink's current code base as well -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29865) Allow configuring the JDK in build-nightly-dist.yml
Nico Kruber created FLINK-29865: --- Summary: Allow configuring the JDK in build-nightly-dist.yml Key: FLINK-29865 URL: https://issues.apache.org/jira/browse/FLINK-29865 Project: Flink Issue Type: Improvement Components: Build System / Azure Pipelines Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.17.0 {{build-nightly-dist.yml}} currently uses the default JDK from https://github.com/flink-ci/flink-ci-docker which happens to be Java 1.8 that we use for releases. We should # not rely on this default being set to 1.8 and # be able to configure this in the workflows themselves -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29643) Possible NPE in ApplicationDispatcherBootstrap with failedJob submission and no HA
Nico Kruber created FLINK-29643: --- Summary: Possible NPE in ApplicationDispatcherBootstrap with failedJob submission and no HA Key: FLINK-29643 URL: https://issues.apache.org/jira/browse/FLINK-29643 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.15.2, 1.16.0, 1.17.0 Reporter: Nico Kruber If - {{PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID}} is not set, and - high availabibility is not activated, and - {{DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR}} is set, then a failure in job submission may fail with an NPE since the appropriate code in {{ApplicationDispatcherBootstrap#runApplicationEntryPoint()}} is trying to read the {{failedJobId}} from the configuration where it will not be present in these cases. Please refer to the conditions that set the {{jobId}} in {{ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync()}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29208) Logs and stdout endpoints not mentioned on OpenAPI spec
Nico Kruber created FLINK-29208: --- Summary: Logs and stdout endpoints not mentioned on OpenAPI spec Key: FLINK-29208 URL: https://issues.apache.org/jira/browse/FLINK-29208 Project: Flink Issue Type: Bug Components: Runtime / REST Affects Versions: 1.15.2 Reporter: Nico Kruber Using Flink's web UI and clicking on "Stdout" or "Logs" in a JM or TM accesses endpoints {{/jobmanager/logs}} and {{/jobmanager/stdout}} (and similar for TMs) but these don't seem to exist in the [REST API docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/] or the [REST API OpenAPI spec|https://nightlies.apache.org/flink/flink-docs-master/generated/rest_v1_dispatcher.yml]. Either these should become some webui-internal APIs (for which no concept exists at the moment), or these endpoints should be added to the docs and spec. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-27836) RocksDBMapState iteration may stop too early for var-length prefixes
Nico Kruber created FLINK-27836: --- Summary: RocksDBMapState iteration may stop too early for var-length prefixes Key: FLINK-27836 URL: https://issues.apache.org/jira/browse/FLINK-27836 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.14.4, 1.13.6, 1.15.0 Reporter: Nico Kruber A similar, yet orthogonal, issue to https://issues.apache.org/jira/browse/FLINK-11141 is that the iterators used in RocksDBMapState iterate over everything with a matching prefix of flink-key and namespace. With var-length serializers for either of them, however, it may return data for unrelated keys and/or namespaces. It looks like the built-in serializers of Flink are not affected though since they use a var-length encoding that is prefixed with the object's length and thus different lengths will not have the same prefix. More exotic serializers, e.g. relying on a terminating NUL character, may expose the above-mentioned behaviour, though. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27353) Update training exercises to use Flink 1.15
Nico Kruber created FLINK-27353: --- Summary: Update training exercises to use Flink 1.15 Key: FLINK-27353 URL: https://issues.apache.org/jira/browse/FLINK-27353 Project: Flink Issue Type: New Feature Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.15.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27322) Add license headers and spotless checks for them
Nico Kruber created FLINK-27322: --- Summary: Add license headers and spotless checks for them Key: FLINK-27322 URL: https://issues.apache.org/jira/browse/FLINK-27322 Project: Flink Issue Type: Bug Components: Documentation / Training / Exercises Affects Versions: 1.14.4 Reporter: Nico Kruber Assignee: Nico Kruber It looks as if there are a couple of files that are missing their appropriate license headers, e.g. https://github.com/apache/flink-training/blob/0b1c83b16065484200564402bef2ca10ef19cb30/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/RideAndFare.java We should fix that by: # adding the missing license headers # adding spotless checks to ensure this doesn't happen again Potential downside: if a user doing the training exercises creates files on their own, these would need the license header as well. On the other hand, a simple `./gradlew spotlessApply` can fix that easily -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-26852) RocksDBMapState#clear not forwarding exceptions
Nico Kruber created FLINK-26852: --- Summary: RocksDBMapState#clear not forwarding exceptions Key: FLINK-26852 URL: https://issues.apache.org/jira/browse/FLINK-26852 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.14.4, 1.15.0 Reporter: Nico Kruber I accidentally found an inconsistent behaviour in the RocksDB state backend implementation: If there's an exception in {{AbstractRocksDBState#clear()}} it will forward that inside a {{FlinkRuntimeException}}. However, if there's an exception in {{RocksDBMapState#clear}} it will merely print the exception stacktrace and continue as is. I assume, forwarding the exception as {{FlinkRuntimeException}} should be the desired behaviour for both use cases... -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25362) Incorrect dependencies in Table Confluent/Avro docs
Nico Kruber created FLINK-25362: --- Summary: Incorrect dependencies in Table Confluent/Avro docs Key: FLINK-25362 URL: https://issues.apache.org/jira/browse/FLINK-25362 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.14.2, 1.13.5, 1.12.7 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.3 "Confluent Avro Format" is missing an explanation to also * add the dependency to flink-avro * have the confluent repository defined "Avro Format" should not show the maven dependency to {{flink-sql-avro}} but instead {{flink-avro}} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25027) Allow GC of a finished job's JobMaster before the slot timeout is reached
Nico Kruber created FLINK-25027: --- Summary: Allow GC of a finished job's JobMaster before the slot timeout is reached Key: FLINK-25027 URL: https://issues.apache.org/jira/browse/FLINK-25027 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.13.3, 1.12.5, 1.14.0 Reporter: Nico Kruber Attachments: image-2021-11-23-20-32-20-479.png In a session cluster, after a (batch) job is finished, the JobMaster seems to stick around for another couple of minutes before being eligible for garbage collection. Looking into a heap dump, it seems to be tied to a {{PhysicalSlotRequestBulkCheckerImpl}} which is enqueued in the underlying Akka executor (and keeps the JM from being GC’d). Per default the action is scheduled for {{slot.request.timeout}} that defaults to 5 min (thanks [~trohrmann] for helping out here) !image-2021-11-23-20-32-20-479.png! With this setting, you will have to account for enough metaspace to cover 5 minutes of time which may span a couple of jobs, needlessly! The problem seems to be that Flink is using the main thread executor for the scheduling that uses the {{ActorSystem}}'s scheduler and the future task scheduled with Akka can (probably) not be easily cancelled. One idea could be to use a dedicated thread pool per JM, that we shut down when the JM terminates. That way we would not keep the JM from being GC’d. (The concrete example we investigated was a DataSet job) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25023) ClassLoader leak on JM/TM through indirectly-started Hadoop threads out of user code
Nico Kruber created FLINK-25023: --- Summary: ClassLoader leak on JM/TM through indirectly-started Hadoop threads out of user code Key: FLINK-25023 URL: https://issues.apache.org/jira/browse/FLINK-25023 Project: Flink Issue Type: Bug Components: Connectors / FileSystem, Connectors / Hadoop Compatibility, FileSystems Affects Versions: 1.13.3, 1.12.5, 1.14.0 Reporter: Nico Kruber If a Flink job is using HDFS through Flink's filesystem abstraction (either on the JM or TM), that code may actually spawn a few threads, e.g. from static class members: * {{org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner}} * {{IPC Parameter Sending Thread#*}} These threads are started as soon as the classes are loaded which may be in the context of the user code. In this specific scenario, however, the created threads may contain references to the context class loader (I did not see that though) or, as happened here, it may inherit thread contexts such as the {{ProtectionDomain}} (from an {{{}AccessController{}}}). Hence user contexts and user class loaders are leaked into long-running threads that are run in Flink's (parent) classloader. Fortunately, it seems to only *leak a single* {{ChildFirstClassLoader}} in this concrete example but that may depend on which code paths each client execution is walking. A *proper solution* doesn't seem so simple: * We could try to proactively initialize available file systems in the hope to start all threads in the parent classloader with parent context. * We could create a default {{ProtectionDomain}} for spawned threads as discussed at [https://dzone.com/articles/javalangoutofmemory-permgen], however, the {{StatisticsDataReferenceCleaner}} isn't actually actively spawned from any callback but as a static variable and this with the class loading itself (but maybe this is still possible somehow). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25022) ClassLoader leak with ThreadLocals on the JM when submitting a job through the REST API
Nico Kruber created FLINK-25022: --- Summary: ClassLoader leak with ThreadLocals on the JM when submitting a job through the REST API Key: FLINK-25022 URL: https://issues.apache.org/jira/browse/FLINK-25022 Project: Flink Issue Type: Bug Components: Runtime / REST Affects Versions: 1.13.3, 1.12.5, 1.14.0 Reporter: Nico Kruber If a job is submitted using the REST API's {{/jars/:jarid/run}} endpoint, user code has to be executed on the JobManager and it is doing this in a couple of (pooled) dispatcher threads like {{{}Flink-DispatcherRestEndpoint-thread-*{}}}. If the user code is using thread locals (and not cleaning them up), they may remain in the thread with references to the {{ChildFirstClassloader}} of the job and thus leaking that. We saw this for the {{jsoniter}} scala library at the JM which [creates ThreadLocal instances|https://github.com/plokhotnyuk/jsoniter-scala/blob/95c7053cfaa558877911f3448382f10d53c4fcbf/jsoniter-scala-core/jvm/src/main/scala/com/github/plokhotnyuk/jsoniter_scala/core/package.scala] but doesn't remove them, but it can actually happen with any user code or (worse) library used in user code. There are a few *workarounds* a user can use, e.g. putting the library in Flink's lib/ folder or submitting via the Flink CLI, but these may actually not be possible to use, depending on the circumstances. A *proper fix* should happen in Flink by guarding against any of these things in the dispatcher threads. We could, for example, spawn a separate thread for executing the user's {{main()}} method and once the job is submitted exit that thread and destroy all thread locals along with it. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24769) FlameGraphs in 1.14 do not aggregate subtasks' stack traces anymore
Nico Kruber created FLINK-24769: --- Summary: FlameGraphs in 1.14 do not aggregate subtasks' stack traces anymore Key: FLINK-24769 URL: https://issues.apache.org/jira/browse/FLINK-24769 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.14.0 Reporter: Nico Kruber Attachments: image-2021-11-04-12-59-24-308.png Since Flink 1.14.0, after enabling FlameGraphs and gathering statistics for a task, it doesn't aggregate the results from the parallel instances anymore but instead shows each individual one - something that easily gets too messy for higher parallelism. It seems the last shared method on the stack is {{Task.runWithSystemExitMonitoring}} and then it spawns off into individual lambda functions: !image-2021-11-04-12-59-24-308.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24411) Add docs navigation for release notes
Nico Kruber created FLINK-24411: --- Summary: Add docs navigation for release notes Key: FLINK-24411 URL: https://issues.apache.org/jira/browse/FLINK-24411 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.13.2, 1.12.5, 1.14.0, 1.11.4 Reporter: Nico Kruber I propose to add a "Release Notes" section into the documentation's navigation bar for things like https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/ At the moment, I feel a bit lost in the navigation when viewing that page (which is only linked from the docs home page which I barely ever look at). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24144) Improve DataGenerator to prevent excessive creation of new Random objects
Nico Kruber created FLINK-24144: --- Summary: Improve DataGenerator to prevent excessive creation of new Random objects Key: FLINK-24144 URL: https://issues.apache.org/jira/browse/FLINK-24144 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Affects Versions: 1.13.2, 1.14.0 Reporter: Nico Kruber Assignee: Nico Kruber For a couple of methods in {{DataGenerator}}, new {{Random}} objects are created with a specific seed. Instead, we could create a single {{Random}} object and reset the seed when needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24115) Outdated SQL Temporal Join Example
Nico Kruber created FLINK-24115: --- Summary: Outdated SQL Temporal Join Example Key: FLINK-24115 URL: https://issues.apache.org/jira/browse/FLINK-24115 Project: Flink Issue Type: Bug Components: Documentation, Table SQL / API Reporter: Nico Kruber [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#event-time-temporal-join] is missing a primary key in the Table DDL. Also, the following note does not map the current example anymore: {quote} Note: The event-time temporal join requires the primary key contained in the equivalence condition of the temporal join condition, e.g., The primary key P.product_id of table product_changelog to be constrained in the condition O.product_id = P.product_id. {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24022) Re-Enable Scala checks in flink-training CI
Nico Kruber created FLINK-24022: --- Summary: Re-Enable Scala checks in flink-training CI Key: FLINK-24022 URL: https://issues.apache.org/jira/browse/FLINK-24022 Project: Flink Issue Type: Bug Components: Documentation / Training / Exercises Affects Versions: 1.14.0, 1.13.3 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0, 1.13.3 FLINK-23339 disabled Scala by default but therefore also disabled CI for newly checked-in changes on the Scala code. We should run CI with Scala enabled -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24016) Restore 1.12 SQL docs page on state retention
Nico Kruber created FLINK-24016: --- Summary: Restore 1.12 SQL docs page on state retention Key: FLINK-24016 URL: https://issues.apache.org/jira/browse/FLINK-24016 Project: Flink Issue Type: Improvement Components: Documentation, Table SQL / API Affects Versions: 1.13.2, 1.14.0 Reporter: Nico Kruber {color:#1d1c1d}It seems that the whole {color}[section about state retention from the docs|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/query_configuration.html#idle-state-retention-time]{color:#1d1c1d} in Flink 1.12 vanished with Flink 1.13. It was outdated with these min/max settings but instead of updating it, it was just removed and state retention/TTL is now safely hidden in [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-state-ttl]{color} {color:#1d1c1d}The discussion in the 1.12 docs is, however, superior since it explains a bit more why we need it and the types of queries that need it. I therefore propose to restore it somewhere in the docs. {color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24008) Support state cleanup based on unique keys
Nico Kruber created FLINK-24008: --- Summary: Support state cleanup based on unique keys Key: FLINK-24008 URL: https://issues.apache.org/jira/browse/FLINK-24008 Project: Flink Issue Type: New Feature Components: Table SQL / Runtime Affects Versions: 1.14.0 Reporter: Nico Kruber In a join of two tables where we join on unique columns, e.g. from primary keys, we could clean up join state more pro-actively since we now whether future joins with this row are still possible (assuming uniqueness of that key). While this may not solve all issues of growing state in non-time-based joins it may still considerably reduce state size, depending on the involved columns. This would add one more way of expiring state that the operator stores; currently there are only these * time-based joins, e.g. interval join * idle state retention via \{{TableConfig#setIdleStateRetention()}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23993) Describe eventually-consistency of materializing upserts
Nico Kruber created FLINK-23993: --- Summary: Describe eventually-consistency of materializing upserts Key: FLINK-23993 URL: https://issues.apache.org/jira/browse/FLINK-23993 Project: Flink Issue Type: Sub-task Components: Documentation, Table SQL / Ecosystem Affects Versions: 1.14.0 Reporter: Nico Kruber FLINK-20374 added an upsert materialization operator which fixes the order of shuffled streams. The results of this operator are actually _eventually consistent_ (it collects the latest value it has seen and redacts older versions when these are not valid anymore). You could see a result stream like this, based on the order the materialization receives events: +I10, -I10, +I5, -I5, +I10, -I10, +I3, -I3, +I10 Each time, the value stored in Kafka would change until the "final" result is in. It may be acceptable for upsert sinks, but should be documented (or changed/fixed) nonetheless. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23839) Unclear severity of Kafka transaction recommit warning in logs
Nico Kruber created FLINK-23839: --- Summary: Unclear severity of Kafka transaction recommit warning in logs Key: FLINK-23839 URL: https://issues.apache.org/jira/browse/FLINK-23839 Project: Flink Issue Type: Sub-task Components: Connectors / Kafka Affects Versions: 1.13.2, 1.12.5, 1.11.4 Reporter: Nico Kruber In a transactional Kafka sink, after recovery, all transactions from the recovered checkpoint are recommitted even though they may have already been committed before because this is not part of the checkpoint. This second commit can lead to a number of WARN entries in the logs coming from [KafkaCommitter|https://github.com/apache/flink/blob/6c9818323b41a84137c52822d2993df788dbc9bb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java#L66] or [FlinkKafkaProducer|https://github.com/apache/flink/blob/6c9818323b41a84137c52822d2993df788dbc9bb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1057]. Examples: {code} WARN [org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer] ... Encountered error org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. while recovering transaction KafkaTransactionState [transactionalId=..., producerId=12345, epoch=123]. Presumably this transaction has been already committed before. WARN [org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer] ... Encountered error org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. while recovering transaction KafkaTransactionState [transactionalId=..., producerId=12345, epoch=12345]. Presumably this transaction has been already committed before {code} It sounds to me like the second exception is useful and indicates that the transaction timeout is too short. The first exception, however, seems superfluous and rather alerts the user more than it helps. Or what would you do with it? Can we instead filter out superfluous exceptions and at least put these onto DEBUG logs instead? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23812) Support configuration of the RocksDB logging via configuration
Nico Kruber created FLINK-23812: --- Summary: Support configuration of the RocksDB logging via configuration Key: FLINK-23812 URL: https://issues.apache.org/jira/browse/FLINK-23812 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.13.2 Reporter: Nico Kruber Assignee: Nico Kruber Since FLINK-14482 has been merged now, we should also allow users to configure more than just the log level (FLINK-20911) but also the following parameters so that they can safely enable RocksDB logging again by using a rolling logger, for example: - max log file size via {{state.backend.rocksdb.log.max-file-size}} - logging files to keep via {{state.backend.rocksdb.log.file-num}} - log directory {{state.backend.rocksdb.log.dir}}, e.g. to put these logs onto a (separate) volume that may not be local and is retained after container shutdown for debugging purposes -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23670) Add Scala formatting checks to training exercises
Nico Kruber created FLINK-23670: --- Summary: Add Scala formatting checks to training exercises Key: FLINK-23670 URL: https://issues.apache.org/jira/browse/FLINK-23670 Project: Flink Issue Type: Bug Components: Documentation / Training / Exercises Affects Versions: 1.13.2 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.13.3 Currently, there are no formatting checks for Scala code in the training exercises. We should employ the same checks that the main Flink project is using. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23669) Avoid using Scala >= 2.12.8 in Flink Training exercises
Nico Kruber created FLINK-23669: --- Summary: Avoid using Scala >= 2.12.8 in Flink Training exercises Key: FLINK-23669 URL: https://issues.apache.org/jira/browse/FLINK-23669 Project: Flink Issue Type: Bug Components: Documentation / Training / Exercises Affects Versions: 1.13.2 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.13.3 The current IDE setup instructions of the Flink training exercises do not mention a specific Scala SDK to set up. For compatibility reasons described in https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/datastream/project-configuration/#scala-versions, we should also not use 2.12.8 and up. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23667) Fix training exercises IDE setup description for Scala
Nico Kruber created FLINK-23667: --- Summary: Fix training exercises IDE setup description for Scala Key: FLINK-23667 URL: https://issues.apache.org/jira/browse/FLINK-23667 Project: Flink Issue Type: Bug Components: Documentation / Training / Exercises Affects Versions: 1.13.2 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.13.3 If you follow the training exercises instructions to set up your IDE with code formatting and the Save Actions plugin while having Scala enabled, it will completely reformat your Scala code files instead of keeping them as is. The instructions should be updated to match the ones used for the Flink main project. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23340) Improve development instructions for flink-training exercises
Nico Kruber created FLINK-23340: --- Summary: Improve development instructions for flink-training exercises Key: FLINK-23340 URL: https://issues.apache.org/jira/browse/FLINK-23340 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0, 1.13.3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23339) Disable flink-training exercises in Scala by default
Nico Kruber created FLINK-23339: --- Summary: Disable flink-training exercises in Scala by default Key: FLINK-23339 URL: https://issues.apache.org/jira/browse/FLINK-23339 Project: Flink Issue Type: Sub-task Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0, 1.13.3 At various occasions during training held by us, people who were not developing in Scala have reported issues with the current setup. If we make the Scala exercises optional, that should help reducing friction for the others. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23338) Use Spotless for flink-training exercises as well
Nico Kruber created FLINK-23338: --- Summary: Use Spotless for flink-training exercises as well Key: FLINK-23338 URL: https://issues.apache.org/jira/browse/FLINK-23338 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0, 1.13.3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23337) Properly use the 'shadow' plugin and remove flinkShadowJar
Nico Kruber created FLINK-23337: --- Summary: Properly use the 'shadow' plugin and remove flinkShadowJar Key: FLINK-23337 URL: https://issues.apache.org/jira/browse/FLINK-23337 Project: Flink Issue Type: Sub-task Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0, 1.13.3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23336) Use the same log4j version as in Flink 1.13
Nico Kruber created FLINK-23336: --- Summary: Use the same log4j version as in Flink 1.13 Key: FLINK-23336 URL: https://issues.apache.org/jira/browse/FLINK-23336 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23335) Add a separate 'runSolution' gradle task
Nico Kruber created FLINK-23335: --- Summary: Add a separate 'runSolution' gradle task Key: FLINK-23335 URL: https://issues.apache.org/jira/browse/FLINK-23335 Project: Flink Issue Type: Sub-task Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23334) Move 'application' implementation decision to subprojects
Nico Kruber created FLINK-23334: --- Summary: Move 'application' implementation decision to subprojects Key: FLINK-23334 URL: https://issues.apache.org/jira/browse/FLINK-23334 Project: Flink Issue Type: Sub-task Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23333) Improve gradle setup for flink-training exercises
Nico Kruber created FLINK-2: --- Summary: Improve gradle setup for flink-training exercises Key: FLINK-2 URL: https://issues.apache.org/jira/browse/FLINK-2 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23332) Update flink-training exercises gradle version
Nico Kruber created FLINK-23332: --- Summary: Update flink-training exercises gradle version Key: FLINK-23332 URL: https://issues.apache.org/jira/browse/FLINK-23332 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23331) FileReadingWatermarkITCase.testWatermarkEmissionWithChaining fails on Azure
Nico Kruber created FLINK-23331: --- Summary: FileReadingWatermarkITCase.testWatermarkEmissionWithChaining fails on Azure Key: FLINK-23331 URL: https://issues.apache.org/jira/browse/FLINK-23331 Project: Flink Issue Type: Bug Affects Versions: 1.14.0 Reporter: Nico Kruber https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20223=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=f508e270-48d6-5f1e-3138-42a17e0714f0=4767 {code} Jul 09 09:24:32 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 3.266 s <<< FAILURE! - in org.apache.flink.test.streaming.api.FileReadingWatermarkITCase Jul 09 09:24:32 [ERROR] testWatermarkEmissionWithChaining(org.apache.flink.test.streaming.api.FileReadingWatermarkITCase) Time elapsed: 3.191 s <<< FAILURE! Jul 09 09:24:32 java.lang.AssertionError: too few watermarks emitted: 4 Jul 09 09:24:32 at org.junit.Assert.fail(Assert.java:89) Jul 09 09:24:32 at org.junit.Assert.assertTrue(Assert.java:42) Jul 09 09:24:32 at org.apache.flink.test.streaming.api.FileReadingWatermarkITCase.testWatermarkEmissionWithChaining(FileReadingWatermarkITCase.java:66) Jul 09 09:24:32 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Jul 09 09:24:32 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) Jul 09 09:24:32 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Jul 09 09:24:32 at java.lang.reflect.Method.invoke(Method.java:498) Jul 09 09:24:32 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) Jul 09 09:24:32 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) Jul 09 09:24:32 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) Jul 09 09:24:32 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) Jul 09 09:24:32 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Jul 09 09:24:32 at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) Jul 09 09:24:32 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) Jul 09 09:24:32 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) Jul 09 09:24:32 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) Jul 09 09:24:32 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) Jul 09 09:24:32 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) Jul 09 09:24:32 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) Jul 09 09:24:32 at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) Jul 09 09:24:32 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) Jul 09 09:24:32 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Jul 09 09:24:32 at org.junit.runners.ParentRunner.run(ParentRunner.java:413) Jul 09 09:24:32 at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) Jul 09 09:24:32 at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) Jul 09 09:24:32 at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) Jul 09 09:24:32 at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) Jul 09 09:24:32 at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) Jul 09 09:24:32 at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) Jul 09 09:24:32 at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) Jul 09 09:24:32 at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) Jul 09 09:24:32 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23312) Use -Dfast for building e2e tests on AZP
Nico Kruber created FLINK-23312: --- Summary: Use -Dfast for building e2e tests on AZP Key: FLINK-23312 URL: https://issues.apache.org/jira/browse/FLINK-23312 Project: Flink Issue Type: Improvement Components: Test Infrastructure Affects Versions: 1.13.1 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0 The "e2e" builder in Azure pipelines builds Flink again on top of what the "compile" builder is already doing. This unnecessary duplicates a couple of checks that are enough to execute once and can be skipped via providing {{-Dfast}}. On my local machine with 32GB RAM, 8 physical cores and a fast NVMe SSD, the difference is pretty big: {code} time mvn clean install -Dscala-2.12 -DskipTests -pl flink-dist -am # -> 6:40 min time mvn clean install -Dscala-2.12 -DskipTests -Dfast -pl flink-dist -am # -> 5:40 min {code} Therefore, I'm proposing to add this parameter to the "e2e" builder's compile step. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23311) Improve PojoSerializer test
Nico Kruber created FLINK-23311: --- Summary: Improve PojoSerializer test Key: FLINK-23311 URL: https://issues.apache.org/jira/browse/FLINK-23311 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.13.1 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0 While working with the PojoSerializer a bit more, I noticed a couple of minor things that are off in the current tests: - the test Pojo does not take {{dumm5}} into account for {{hashCode}} and {{equals}} - error messages are not so nice (and mix up the order of expected and actual values) I'll create a PR for fixing these things in one go under this ticket. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23102) Accessing FlameGraphs while not being enabled returns an exception
Nico Kruber created FLINK-23102: --- Summary: Accessing FlameGraphs while not being enabled returns an exception Key: FLINK-23102 URL: https://issues.apache.org/jira/browse/FLINK-23102 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.13.1 Reporter: Nico Kruber Attachments: image-2021-06-22-17-36-47-730.png Trying to retrieve the FlameGraph in a job that doesn't have it enabled returns this ugly exception: !image-2021-06-22-17-36-47-730.png! Instead, it could mention that this feature is not enabled and describe how to enable it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23101) Flame Graphs initial view says it is 18800 days in the past
Nico Kruber created FLINK-23101: --- Summary: Flame Graphs initial view says it is 18800 days in the past Key: FLINK-23101 URL: https://issues.apache.org/jira/browse/FLINK-23101 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Reporter: Nico Kruber Attachments: image.png When you look at the Flame Graphs for a task for the first time, it will show an empty space and say that the measurement was ~18800 days in the past (see the attached image). This should rather be something more useful like "no measurement yet" or so... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22699) Make ConstantArgumentCount public API
Nico Kruber created FLINK-22699: --- Summary: Make ConstantArgumentCount public API Key: FLINK-22699 URL: https://issues.apache.org/jira/browse/FLINK-22699 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.13.0 Reporter: Nico Kruber {{ConstantArgumentCount}} is quite useful when implementing custom type inference. While the user can, of course, implement an {{ArgumentCount}} as well with just a few methods, it feels like this one is the most commonly used implementation and could be provided as public API (currently, it's marked {{@Internal}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22405) Support fixed-lengh chars in the LeadLag built-in function
Nico Kruber created FLINK-22405: --- Summary: Support fixed-lengh chars in the LeadLag built-in function Key: FLINK-22405 URL: https://issues.apache.org/jira/browse/FLINK-22405 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.12.2 Reporter: Nico Kruber LeadLag aggregate function does not support type: ''CHAR'', as in the following example (a CAST to VARCHAR works around this). Technically, there should be no reason though to support STRING/VARCHAR but not CHAR: {code:sql} CREATE TEMPORARY VIEW test_cardinality AS SELECT * FROM ( VALUES ('Alice', 'al...@test.com', ARRAY [ 'al...@test.com' ], 'Test Ltd'), ('Alice', 'al...@test.com', ARRAY [ 'al...@test.com' ], 'Test Ltd'), ('Alice', 'al...@test2.com', ARRAY [ 'al...@test.com', 'al...@test2.com' ], 'Test Ltd')) AS t ( name, email, aliases, company ); {code} {code:sql} SELECT name, LEAD(company, 0) AS company FROM test_cardinality WHERE CARDINALITY(aliases) >= 2 GROUP BY name; {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21584) Support UNNEST in LEFT JOINs
Nico Kruber created FLINK-21584: --- Summary: Support UNNEST in LEFT JOINs Key: FLINK-21584 URL: https://issues.apache.org/jira/browse/FLINK-21584 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.12.1 Reporter: Nico Kruber Currently, UNNEST (for arrays and maps) is only supported in CROSS JOIN operations, but you may actually also want this in a LEFT JOIN fashion in which case you would get {{NULL}} values for the expanded fields. h1. Example {code:sql} CREATE TEMPORARY VIEW input ( f1, f2 ) AS VALUES ('A', STR_TO_MAP('')), ('B', STR_TO_MAP('1, 2')); SELECT * FROM input LEFT JOIN UNNEST(f2); {code} h1. Current workaround {code:sql} CREATE TEMPORARY VIEW input ( f1, f2 ) AS VALUES ('A', STR_TO_MAP('')), ('B', STR_TO_MAP('1, 2')); SELECT * FROM input CROSS JOIN UNNEST(f2) UNION ALL SELECT *, NULLIF('1', '1') AS `KEY`, NULLIF('1', '1') as `VALUE` FROM input WHERE f2 IS NULL OR CARDINALITY(f2) = 0; {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21583) Allow comments in CSV format without having to ignore parse errors
Nico Kruber created FLINK-21583: --- Summary: Allow comments in CSV format without having to ignore parse errors Key: FLINK-21583 URL: https://issues.apache.org/jira/browse/FLINK-21583 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Ecosystem Affects Versions: 1.12.1 Reporter: Nico Kruber Currently, when you pass {{'csv.allow-comments' = 'true'}} to a table definition, you also have to set {{'csv.ignore-parse-errors' = 'true'}} to actually skip the commented-out line (and the docs mention this prominently as well). This, however, may mask actual parsing errors that you want to be notified of. I would like to propose that {{allow-comments}} actually also skips the commented-out lines automatically because these shouldn't be used anyway. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21569) Flink SQL with CSV file input job hangs
Nico Kruber created FLINK-21569: --- Summary: Flink SQL with CSV file input job hangs Key: FLINK-21569 URL: https://issues.apache.org/jira/browse/FLINK-21569 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Runtime Affects Versions: 1.12.1 Reporter: Nico Kruber Attachments: airports.csv, flights-small2.csv In extension to FLINK-21567, I actually also got the job to be stuck on cancellation by doing the following in the SQL client: * configure SQL client defaults to run with parallelism 2 * execute the following statement {code} CREATE TABLE `airports` ( `IATA_CODE` CHAR(3), `AIRPORT` STRING, `CITY` STRING, `STATE` CHAR(2), `COUNTRY` CHAR(3), `LATITUDE` DOUBLE NULL, `LONGITUDE` DOUBLE NULL, PRIMARY KEY (`IATA_CODE`) NOT ENFORCED ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///tmp/kaggle-flight-delay/airports.csv', 'format' = 'csv', 'csv.allow-comments' = 'true', 'csv.ignore-parse-errors' = 'true', 'csv.null-literal' = '' ); CREATE TABLE `flights` ( `_YEAR` CHAR(4), `_MONTH` CHAR(2), `_DAY` CHAR(2), `_DAY_OF_WEEK` TINYINT, `AIRLINE` CHAR(2), `FLIGHT_NUMBER` SMALLINT, `TAIL_NUMBER` CHAR(6), `ORIGIN_AIRPORT` CHAR(3), `DESTINATION_AIRPORT` CHAR(3), `_SCHEDULED_DEPARTURE` CHAR(4), `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'), `_DEPARTURE_TIME` CHAR(4), `DEPARTURE_DELAY` SMALLINT, `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT), TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00')), `TAXI_OUT` SMALLINT, `WHEELS_OFF` CHAR(4), `SCHEDULED_TIME` SMALLINT, `ELAPSED_TIME` SMALLINT, `AIR_TIME` SMALLINT, `DISTANCE` SMALLINT, `WHEELS_ON` CHAR(4), `TAXI_IN` SMALLINT, `SCHEDULED_ARRIVAL` CHAR(4), `ARRIVAL_TIME` CHAR(4), `ARRIVAL_DELAY` SMALLINT, `DIVERTED` BOOLEAN, `CANCELLED` BOOLEAN, `CANCELLATION_REASON` CHAR(1), `AIR_SYSTEM_DELAY` SMALLINT, `SECURITY_DELAY` SMALLINT, `AIRLINE_DELAY` SMALLINT, `LATE_AIRCRAFT_DELAY` SMALLINT, `WEATHER_DELAY` SMALLINT ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///tmp/kaggle-flight-delay/flights-small2.csv', 'format' = 'csv', 'csv.null-literal' = '' ); SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, `NUM_DELAYS` FROM ( SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, COUNT(*) AS `NUM_DELAYS`, ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) AS rownum FROM flights, airports WHERE `ORIGIN_AIRPORT` = `IATA_CODE` AND `DEPARTURE_DELAY` > 0 GROUP BY `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`) WHERE rownum <= 10; {code} Results are shown in the CLI but after quitting the result view, the job seems stuck in CANCELLING until (at least) one of the TMs shuts itself down because a task wouldn't react to the cancelling signal. This appears in its TM logs: {code} 2021-03-02 18:39:19,451 WARN org.apache.flink.runtime.taskmanager.Task [] - Task 'Source: TableSourceScan(table=[[default_catalog, default_database, airports, project=[IATA_CODE, AIRPORT, STATE]]], fields=[IATA_CODE, AIRPORT, STATE]) (2/2)#0' did not react to cancelling signal for 30 seconds, but is stuck in method: sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:653) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) java.lang.Thread.run(Thread.java:748) ... 2021-03-02 18:39:49,447 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did not exit gracefully within 180 + seconds. org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds. at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685) [flink-dist_2.12-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] 2021-03-02 18:39:49,448 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down... org.apache.flink.util.FlinkRuntimeException: Task did not exit
[jira] [Created] (FLINK-21568) Navigating in SQL client can lead to SqlExecutionException
Nico Kruber created FLINK-21568: --- Summary: Navigating in SQL client can lead to SqlExecutionException Key: FLINK-21568 URL: https://issues.apache.org/jira/browse/FLINK-21568 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.12.1 Reporter: Nico Kruber Pressing 'p' in the SQL CLI's result browser before any result is available will result in the following exception being thrown: {code} 2021-03-02 18:27:05,153 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement. org.apache.flink.table.client.gateway.SqlExecutionException: Invalid page '1'. at org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.retrievePage(MaterializedCollectStreamResult.java:177) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.gateway.local.LocalExecutor.retrieveResultPage(LocalExecutor.java:415) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliTableResultView.updatePage(CliTableResultView.java:293) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliTableResultView.gotoPreviousPage(CliTableResultView.java:381) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliTableResultView.evaluate(CliTableResultView.java:183) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliTableResultView.evaluate(CliTableResultView.java:50) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliView.open(CliView.java:125) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:675) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:323) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_282] at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:214) [flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:144) [flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.SqlClient.start(SqlClient.java:115) [flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) [flink-sql-client_2.12-1.12.1.jar:1.12.1] {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21567) CSV Format exception while parsing: ArrayIndexOutOfBoundsException: 4000
Nico Kruber created FLINK-21567: --- Summary: CSV Format exception while parsing: ArrayIndexOutOfBoundsException: 4000 Key: FLINK-21567 URL: https://issues.apache.org/jira/browse/FLINK-21567 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.12.1, 1.11.3 Reporter: Nico Kruber Attachments: flights-small.csv I've been trying to play a bit with the data available at https://www.kaggle.com/usdot/flight-delays and got the following exception: {code} 2021-02-16 18:57:37,913 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, flights, filter=[], project=[ORIGIN_AIRPORT, DEPARTURE_DELAY]]], fields=[ORIGIN_AIRPORT, DEPARTURE_DELAY]) -> Calc(select=[ORIGIN_AIRPORT], where=[(DEPARTURE_DELAY > 0)]) -> LocalHashAggregate(groupBy=[ORIGIN_AIRPORT], select=[ORIGIN_AIRPORT, Partial_COUNT(*) AS count1$0]) (1/1)#0 (ebbf1204d875a5a4ace529df0d5ba719) switched from RUNNING to FAILED. java.io.IOException: Failed to deserialize CSV row. at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:257) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:162) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-dist_2.12-1.12.1.jar:1.12.1] Caused by: java.lang.ArrayIndexOutOfBoundsException: 4000 at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.skipLinesWhenNeeded(CsvDecoder.java:527) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.startNewLine(CsvDecoder.java:499) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleObjectRowEnd(CsvParser.java:1067) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleNextEntry(CsvParser.java:858) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser.nextFieldName(CsvParser.java:665) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:250) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:68) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:280) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:250) ~[flink-csv-1.12.1.jar:1.12.1] ... 5 more {code} h1. Fully working example: Using the attached file (derived from the data on flight delays, linked above) and the SQL CLI: {code} CREATE TABLE `flights` ( `_YEAR` CHAR(4), `_MONTH` CHAR(2), `_DAY` CHAR(2), `_DAY_OF_WEEK` TINYINT, `AIRLINE` CHAR(2), `FLIGHT_NUMBER` SMALLINT, `TAIL_NUMBER` CHAR(6), `ORIGIN_AIRPORT` CHAR(3), `DESTINATION_AIRPORT` CHAR(3), `_SCHEDULED_DEPARTURE` CHAR(4), `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'), `_DEPARTURE_TIME` CHAR(4), `DEPARTURE_DELAY` SMALLINT, `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT), TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00')), `TAXI_OUT` SMALLINT, `WHEELS_OFF` CHAR(4), `SCHEDULED_TIME` SMALLINT, `ELAPSED_TIME`
[jira] [Created] (FLINK-21566) Improve error message for "Unsupported casting"
Nico Kruber created FLINK-21566: --- Summary: Improve error message for "Unsupported casting" Key: FLINK-21566 URL: https://issues.apache.org/jira/browse/FLINK-21566 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.12.1 Reporter: Nico Kruber In a situation like from FLINK-21565, neither the error message {{Unsupported casting from TINYINT to INTERVAL SECOND(3)}}, nor the exception trace (see FLINK-21565) gives you a good hint on where the error is, especially if you have many statements with TINYINTs or operations on these. The query parser could highlight the location of the error inside the SQL statement that the user provided. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21565) Support more integer types in TIMESTAMPADD
Nico Kruber created FLINK-21565: --- Summary: Support more integer types in TIMESTAMPADD Key: FLINK-21565 URL: https://issues.apache.org/jira/browse/FLINK-21565 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.12.1 Reporter: Nico Kruber At the moment, {{TIMESTAMPADD}} does not seem to support {{SMALLINT}} or {{TINYINT}} types which should be perfectly suitable for auto-conversion (in contrast to BIGINT or floating numbers where I would expect the user to cast it appropriately). It currently fails with the following exception: {code} org.apache.flink.table.planner.codegen.CodeGenException: Unsupported casting from TINYINT to INTERVAL SECOND(3). at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.numericCasting(ScalarOperatorGens.scala:2352) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateBinaryArithmeticOperator(ScalarOperatorGens.scala:93) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:590) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:529) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) ~[flink-table_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) ~[flink-table_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) ~[flink-table_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:143) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
[jira] [Created] (FLINK-21563) Support using computed columns when defining (new) computed columns
Nico Kruber created FLINK-21563: --- Summary: Support using computed columns when defining (new) computed columns Key: FLINK-21563 URL: https://issues.apache.org/jira/browse/FLINK-21563 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.11.3 Reporter: Nico Kruber To avoid code duplications, it would be nice to be able to use computed columns in later computations of new computed columns, e.g. {code} CREATE TABLE `flights` ( `_YEAR` CHAR(4), `_MONTH` CHAR(2), `_DAY` CHAR(2), `_SCHEDULED_DEPARTURE` CHAR(4), `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'), `_DEPARTURE_TIME` CHAR(4), `DEPARTURE_DELAY` SMALLINT, `DEPARTURE_TIME` AS SCHEDULED_DEPARTURE + DEPARTURE_DELAY )... {code} Otherwise, a user would have to repeat these calculations over and over again which is not that maintainable. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21562) Add more informative message on CSV parsing errors
Nico Kruber created FLINK-21562: --- Summary: Add more informative message on CSV parsing errors Key: FLINK-21562 URL: https://issues.apache.org/jira/browse/FLINK-21562 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / API Affects Versions: 1.11.3 Reporter: Nico Kruber I was parsing a CSV file with comments in it and used {{'csv.allow-comments' = 'true'}} without also passing {{'csv.ignore-parse-errors' = 'true'}} to the table DDL to not hide any actual format errors. Since I didn't just have strings in my table, this did of course stumble on the commented-out line with the following error: {code} 2021-02-16 17:45:53,055 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, airports]], fields=[IATA_CODE, AIRPORT, CITY, STATE, COUNTRY, LATITUDE, LONGITUDE]) -> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink (1/1)#0 (9f3a3965f18ed99ee42580bdb559ba66) switched from RUNNING to FAILED. java.io.IOException: Failed to deserialize CSV row. at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:257) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:162) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-dist_2.12-1.12.1.jar:1.12.1] Caused by: java.lang.NumberFormatException: empty String at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842) ~[?:1.8.0_275] at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) ~[?:1.8.0_275] at java.lang.Double.parseDouble(Double.java:538) ~[?:1.8.0_275] at org.apache.flink.formats.csv.CsvToRowDataConverters.convertToDouble(CsvToRowDataConverters.java:203) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.formats.csv.CsvToRowDataConverters.lambda$createNullableConverter$ac6e531e$1(CsvToRowDataConverters.java:113) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.formats.csv.CsvToRowDataConverters.lambda$createRowConverter$18bb1dd$1(CsvToRowDataConverters.java:98) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:251) ~[flink-csv-1.12.1.jar:1.12.1] ... 5 more {code} Two things should be improved here: # commented-out lines should be ignored by default (potentially, FLINK-17133 addresses this or at least gives the user the power to do so) # the error message itself is not very informative: "empty String". This ticket is about the latter. I would suggest to have at least a few more pointers to direct the user to finding the source in the CSV file/item/... - here, the data type could just be wrong or the CSV file itself may be wrong/corrupted and the user would need to investigate. What exactly may help here, probably depends on the actual input connector this format is currently working with, e.g. line number in a csv file would be best, otherwise that may not be possible but we could show the whole line or at least a few surrounding fields... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20886) Add the option to get a threaddump on checkpoint timeouts
Nico Kruber created FLINK-20886: --- Summary: Add the option to get a threaddump on checkpoint timeouts Key: FLINK-20886 URL: https://issues.apache.org/jira/browse/FLINK-20886 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.12.0 Reporter: Nico Kruber For debugging checkpoint timeouts, I was thinking about the following addition to Flink: When a checkpoint times out and the async thread is still running, create a threaddump [1] and either add this to the checkpoint stats, log it, or write it out. This may help identifying where the checkpoint is stuck (maybe a lock, could also be in a third party lib like the FS connectors,...). It would give us some insights into what the thread is currently doing. Limiting the scope of the threads would be nice but may not be possible in the general case since additional threads (spawned by the FS connector lib, or otherwise connected) may interact with the async thread(s) by e.g. going through the same locks. [1] https://crunchify.com/how-to-generate-java-thread-dump-programmatically/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20674) Wrong send/received stats with UNION ALL
Nico Kruber created FLINK-20674: --- Summary: Wrong send/received stats with UNION ALL Key: FLINK-20674 URL: https://issues.apache.org/jira/browse/FLINK-20674 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.11.3, 1.12.0 Reporter: Nico Kruber When using {{UNION ALL}} to union the same table twice , the number of records and bytes sent is just half of what the next task receives: Reproducible with this: {code} CREATE TEMPORARY TABLE test ( `number` SMALLINT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); SELECT * FROM ( (SELECT * FROM test) UNION ALL (SELECT * FROM test) ) {code} Arguably, the use case is not too useful but other combinations may be affected, too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20217) More fine-grained timer processing
Nico Kruber created FLINK-20217: --- Summary: More fine-grained timer processing Key: FLINK-20217 URL: https://issues.apache.org/jira/browse/FLINK-20217 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.11.2, 1.10.2, 1.12.0 Reporter: Nico Kruber Timers are currently processed in one big block under the checkpoint lock (under {{InternalTimerServiceImpl#advanceWatermark}}. This can be problematic in a number of scenarios while doing checkpointing which would lead to checkpoints timing out (and even unaligned checkpoints would not help). If you have a huge number of timers to process when advancing the watermark and the task is also back-pressured, the situation may actually be worse since you would block on the checkpoint lock and also wait for buffers/credits from the receiver. I propose to make this loop more fine-grained so that it is interruptible by checkpoints, but maybe there is also some other way to improve here. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20104) Web UI checkpoint stats "refresh" button has to be clicked twice
Nico Kruber created FLINK-20104: --- Summary: Web UI checkpoint stats "refresh" button has to be clicked twice Key: FLINK-20104 URL: https://issues.apache.org/jira/browse/FLINK-20104 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.11.2, 1.12.0 Environment: Firefox on Linux Reporter: Nico Kruber In order to get the UI's checkpoint stats updated, I always have to click the refresh button twice - a single click doesn't change anything. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20099) HeapStateBackend checkpoint error hidden under cryptic message
Nico Kruber created FLINK-20099: --- Summary: HeapStateBackend checkpoint error hidden under cryptic message Key: FLINK-20099 URL: https://issues.apache.org/jira/browse/FLINK-20099 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Affects Versions: 1.11.2 Reporter: Nico Kruber Attachments: Screenshot_20201112_001331.png When the memory state back-end hits a certain size, it fails to permit checkpoints. Even though a very detailed exception is thrown at its source, this is neither logged nor shown in the UI: * Logs just contain: {code:java} 00:06:41.462 [jobmanager-future-thread-14] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 2 by task 8eb303cd3196310cb2671212f4ed013c of job c9b7a410bd3143864ca23ba89595d878 at 6a73bcf2-46b6-4735-a616-fdf09ff1471c @ localhost (dataPort=-1). {code} * UI: (also see the attached Screenshot_20201112_001331.png) {code:java} Failure Message: The job has failed. {code} -> this isn't even true: the job is still running fine! Debugging into {{PendingCheckpoint#abort()}} reveals that the causing exception is actually still in there but the detailed information from it is just never used. For reference, this is what is available there and should be logged or shown: {code:java} java.lang.Exception: Could not materialize checkpoint 2 for operator aggregates -> (Sink: sink-agg-365, Sink: sink-agg-180, Sink: sink-agg-45, Sink: sink-agg-30) (4/4). at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:191) at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:138) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=6122737 , maxSize=5242880 . Consider using a different state backend, like the File System State backend. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:479) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:50) at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:102) ... 3 more Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=6122737 , maxSize=5242880 . Consider using a different state backend, like the File System State backend. at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64) at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:145) at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:126) at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77) at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:199) at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:476) ... 5 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20061) Row constructor unsupported in aggregation function
Nico Kruber created FLINK-20061: --- Summary: Row constructor unsupported in aggregation function Key: FLINK-20061 URL: https://issues.apache.org/jira/browse/FLINK-20061 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.11.2 Reporter: Nico Kruber I was trying to use {{ROW}} in a user-defined aggregate function in a query like this: {code} SELECT `id`, HOP_END(`timestamp`, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE) AS `window_end`, RowMaxv0(`amount`, ROW(`timestamp`, `amount`, `payload`)) AS `max_amount` FROM `input` GROUP BY HOP(`timestamp`, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE), `id`; {code} Eventually this resulted in an "unsupported" exception from Calcite: {code} Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. null at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at com.ververica.platform.sql.functions.RowMaxv0.main(RowMaxv0.java:93) Caused by: java.lang.UnsupportedOperationException at org.apache.calcite.sql.validate.SqlValidatorImpl.validateColumnListParams(SqlValidatorImpl.java:5689) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:268) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 5 more {code} A workaround for this is to go via a subquery like the following but ultimately, this should result in the same thing (a simple projection). {code} SELECT `id`, HOP_END(`timestamp`, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE) AS `window_end`, RowMaxv0(`amount`, `row`) AS `max_amount` FROM (SELECT `id`, `timestamp`, `amount`, ROW(`timestamp`, `amount`, `payload`) AS `row` FROM `input`) GROUP BY HOP(`timestamp`, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE), `id` {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20059) Outdated SQL docs on aggregate functions' merge
Nico Kruber created FLINK-20059: --- Summary: Outdated SQL docs on aggregate functions' merge Key: FLINK-20059 URL: https://issues.apache.org/jira/browse/FLINK-20059 Project: Flink Issue Type: Bug Components: Documentation, Table SQL / API Affects Versions: 1.11.2, 1.12.0 Reporter: Nico Kruber In the java docs as well as the user docs, the {{merge}} method of an aggregation UDF is described as optional, e.g. {quote}Merges a group of accumulator instances into one accumulator instance. This function must be implemented for data stream session window grouping aggregates and data set grouping aggregates.{quote} However, it seems that nowadays this method is required in more cases (I stumbled on this for a HOP window in streaming): {code} StreamExecGlobalGroupAggregate.scala .needMerge(mergedAccOffset, mergedAccOnHeap, mergedAccExternalTypes) StreamExecGroupWindowAggregateBase.scala generator.needMerge(mergedAccOffset = 0, mergedAccOnHeap = false) StreamExecIncrementalGroupAggregate.scala .needMerge(mergedAccOffset, mergedAccOnHeap = true, mergedAccExternalTypes) StreamExecLocalGroupAggregate.scala .needMerge(mergedAccOffset = 0, mergedAccOnHeap = true) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19999) State Processor API classes leaking into savepoint
Nico Kruber created FLINK-1: --- Summary: State Processor API classes leaking into savepoint Key: FLINK-1 URL: https://issues.apache.org/jira/browse/FLINK-1 Project: Flink Issue Type: Bug Components: API / State Processor Affects Versions: 1.11.2 Reporter: Nico Kruber Currently, any configuration for serializers that you are using when writing a State Processor API job will be shared with the serializers that are used for writing a savepoint. However, your normal job shouldn't necessarily depend on (helper) classes that you only use in the StateProc API job. By default, for example, {{ExecutionConfig#autoTypeRegistrationEnabled = true}} and thus classes like {{org.apache.flink.runtime.checkpoint.OperatorSubtaskState}} will be registered with Kryo and will thus also be needed when reading the created savepoint if you have Kryo serialization in your job. This particular instance can be worked around by calling {{ExecutionConfig#disableAutoTypeRegistration()}} but the problem is probably bigger and extends to other type registrations, e.g. POJOs, as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19972) Provide more details when type serializers are not compatible
Nico Kruber created FLINK-19972: --- Summary: Provide more details when type serializers are not compatible Key: FLINK-19972 URL: https://issues.apache.org/jira/browse/FLINK-19972 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.11.2 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.12.0 Currently, when the type serializer is incompatible, you get exceptions like these: {code:java} StateMigrationException("For heap backends, the new namespace serializer must be compatible."); StateMigrationException("The new namespace serializer must be compatible."); StateMigrationException("For heap backends, the new state serializer must not be incompatible."); StateMigrationException("The new state serializer cannot be incompatible.") StateMigrationException("The new key serializer must be compatible."){code} which are not really helpful to the user in debugging serializers. Since we already have the old serializer (snapshot) and the new one available, we should add this detail to the exceptions for improved usability. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19462) Checkpoint statistics for unfinished task snapshots
Nico Kruber created FLINK-19462: --- Summary: Checkpoint statistics for unfinished task snapshots Key: FLINK-19462 URL: https://issues.apache.org/jira/browse/FLINK-19462 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / Metrics Reporter: Nico Kruber If a checkpoint times out, there are currently no stats on the not-yet-finished tasks in the Web UI, so you have to crawl into (debug?) logs. It would be nice to have these incomplete stats in there instead so that you know quickly what was going on. I could think of these ways to accomplish this: * the checkpoint coordinator could ask the TMs for it after failing the checkpoint or * the TMs could send the stats when they notice that the checkpoint is aborted Maybe there are more options, but I think, this improvement in general would benefit debugging checkpoints. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19381) Fix docs about relocatable savepoints
Nico Kruber created FLINK-19381: --- Summary: Fix docs about relocatable savepoints Key: FLINK-19381 URL: https://issues.apache.org/jira/browse/FLINK-19381 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.11.2, 1.12.0 Reporter: Nico Kruber Although savepoints are relocatable since Flink 1.11, the docs still state otherwise, for example in [https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#triggering-savepoints] The warning there, as well as the other changes from FLINK-15863, should be removed again and potentially replaces with new constraints. One known constraint is that if taskowned state is used (\{{GenericWriteAhreadLog}} sink), savepoints are currently not relocatable yet. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19112) No access to metric group in ScalarFunction when optimizing
Nico Kruber created FLINK-19112: --- Summary: No access to metric group in ScalarFunction when optimizing Key: FLINK-19112 URL: https://issues.apache.org/jira/browse/FLINK-19112 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.11.1 Reporter: Nico Kruber Attachments: MetricsGroupBug.java Under some circumstances, I cannot access {{context.getMetricGroup()}} in a {{ScalarFunction}} like this (full job attached): {code:java} public static class MyUDF extends ScalarFunction { @Override public void open(FunctionContext context) throws Exception { super.open(context); context.getMetricGroup(); } public Integer eval(Integer id) { return id; } } {code} which leads to this exception: {code:java} Exception in thread "main" java.lang.UnsupportedOperationException: getMetricGroup is not supported when optimizing at org.apache.flink.table.planner.codegen.ConstantFunctionContext.getMetricGroup(ExpressionReducer.scala:249) at com.ververica.MetricsGroupBug$MyUDF.open(MetricsGroupBug.java:57) at ExpressionReducer$2.open(Unknown Source) at org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:118) at org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressionsInternal(ReduceExpressionsRule.java:696) at org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressions(ReduceExpressionsRule.java:618) at org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:303) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549) at com.ververica.MetricsGroupBug.main(MetricsGroupBug.java:50) {code} I also tried to work around this with a try-catch, assuming that this method is called once during optimisation and another time at runtime.
[jira] [Created] (FLINK-18962) Improve error message if checkpoint directory is not writable
Nico Kruber created FLINK-18962: --- Summary: Improve error message if checkpoint directory is not writable Key: FLINK-18962 URL: https://issues.apache.org/jira/browse/FLINK-18962 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.11.1 Reporter: Nico Kruber If the checkpoint directory from {{state.checkpoints.dir}} is not writable by the user that Flink is running with, checkpoints will be declined, but the real cause is not mentioned anywhere: * the Web UI says: "Cause: The job has failed" (the Flink job is running though) * the JM log says: {code} 2020-08-14 12:13:18,820 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering checkpoint 2 (type=CHECKPOINT) @ 159738819 for job 2c567b14e8d0833404931ef47dfec266. 2020-08-14 12:13:18,921 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Decline checkpoint 2 by task 0d4fd75374ad16c8d963679e3c2171ec of job 2c567b14e8d0833404931ef47dfec266 at a184deea621e3923fbfcb1d899348448 @ Nico-PC.lan (dataPort=35531). {code} * the TM log says: {code} 2020-08-14 12:13:14,102 INFO org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Checkpoint 1 has been notified as aborted, would not trigger any checkpoint. {code} And that's it. It should have a real error message indicating that the checkpoint (sub)-directory could not be created. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18955) Add snapshot path to job startup message
Nico Kruber created FLINK-18955: --- Summary: Add snapshot path to job startup message Key: FLINK-18955 URL: https://issues.apache.org/jira/browse/FLINK-18955 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.11.1 Reporter: Nico Kruber When a job is started from a checkpoint or savepoint (I'm using snapshot as the unanimous term below), the {{CheckpointCoordinator}} prints a log line like this: {code} 2020-08-13 13:50:51,418 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job 220d8a4953cd40198b6eb3b1ec0cece0 from latest valid checkpoint: Checkpoint 357 @ 1597326576925 for 220d8a4953cd40198b6eb3b1ec0cece0. {code} I propose to add the path to the snapshot to this message because which snapshot is taken for restore may actually not be that obvious for the user: even if a savepoint was specified in the job start command, e.g. in a Kubernetes pod spec, an HA store could overrule the decision and take a more recent snapshot instead. If that snapshot is a savepoint, it is not that easy to map this to checkpoint IDs and find out which savepoint the job actually started from. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18806) Taskmanager doesn't start up with error in config
Nico Kruber created FLINK-18806: --- Summary: Taskmanager doesn't start up with error in config Key: FLINK-18806 URL: https://issues.apache.org/jira/browse/FLINK-18806 Project: Flink Issue Type: Bug Components: Deployment / Scripts Affects Versions: 1.11.1 Reporter: Nico Kruber With the following (wrong) configuration setting in {{flink-conf.yaml}}, a taskmanager will not start up, basically print nothing on the command line, and have no log file to look at: {code} taskmanager.memory.managed.fraction: '0.4' {code} Console output: {code} > ./bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host Nico-PC.lan. [ERROR] The execution result is empty. [ERROR] Could not get JVM parameters and dynamic configurations properly. {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18769) Streaming Table job stuck when enabling minibatching
Nico Kruber created FLINK-18769: --- Summary: Streaming Table job stuck when enabling minibatching Key: FLINK-18769 URL: https://issues.apache.org/jira/browse/FLINK-18769 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.11.1 Reporter: Nico Kruber The following Table API streaming job is stuck when enabling mini batching {code} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); // disable mini-batching completely to get a result Configuration tableConf = tableEnv.getConfig() .getConfiguration(); tableConf.setString("table.exec.mini-batch.enabled", "true"); tableConf.setString("table.exec.mini-batch.allow-latency", "5 s"); tableConf.setString("table.exec.mini-batch.size", "5000"); tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); tableEnv.executeSql( "CREATE TABLE input_table (" + "location STRING, " + "population INT" + ") WITH (" + "'connector' = 'kafka', " + "'topic' = 'kafka_batching_input', " + "'properties.bootstrap.servers' = 'localhost:9092', " + "'format' = 'csv', " + "'scan.startup.mode' = 'earliest-offset'" + ")"); tableEnv.executeSql( "CREATE TABLE result_table WITH ('connector' = 'print') LIKE input_table (EXCLUDING OPTIONS)"); tableEnv .from("input_table") .groupBy($("location")) .select($("location").cast(DataTypes.CHAR(2)).as("location"), $("population").sum().as("population")) .executeInsert("result_table"); {code} I am using a pre-populated Kafka topic called {{kafka_batching_input}} with these elements: {code} "Berlin",1 "Berlin",2 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18767) Streaming job stuck when disabling operator chaining
Nico Kruber created FLINK-18767: --- Summary: Streaming job stuck when disabling operator chaining Key: FLINK-18767 URL: https://issues.apache.org/jira/browse/FLINK-18767 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.11.1, 1.10.1, 1.9.3, 1.8.3 Reporter: Nico Kruber The following code is stuck sending data from the source to the map operator. Two settings seem to have an influence here: {{env.setBufferTimeout(-1);}} and {{env.disableOperatorChaining();}} - if I remove either of these, the job works as expected. (I pre-populated my Kafka topic with one element to reproduce easily) {code} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // comment either these two and the job works env.setBufferTimeout(-1); env.disableOperatorChaining(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); consumer.setStartFromEarliest(); DataStreamSource input = env.addSource( consumer); input .map((x) -> x) .print(); env.execute(); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18276) NullPointerException when closing KafkaConsumer
Nico Kruber created FLINK-18276: --- Summary: NullPointerException when closing KafkaConsumer Key: FLINK-18276 URL: https://issues.apache.org/jira/browse/FLINK-18276 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.10.1, 1.9.3, 1.8.3, 1.11.0 Reporter: Nico Kruber {code} WARN org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher - Error while closing Kafka consumer java.lang.NullPointerException at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:282) {code} {{KafkaConsumerThread#reassignPartitions}} is temporarily setting {{consumer}} to {{null}} and if there is an exception (in this case, it was a timeout), the {{finally}} block in {{KafkaConsumerThread.run}} would fail with an NPE. Even more so, {{KafkaConsumerThread#reassignPartitions}} put the original consumer into {{consumerTmp}} which is not closed now and may leak underlying (Kafka) resources. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18255) Add API annotations to RocksDB user-facing classes
Nico Kruber created FLINK-18255: --- Summary: Add API annotations to RocksDB user-facing classes Key: FLINK-18255 URL: https://issues.apache.org/jira/browse/FLINK-18255 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.11.0 Reporter: Nico Kruber Several user-facing classes in {{flink-statebackend-rocksdb}} don't have any API annotations, not even {{@PublicEvolving}}. These should be added to clarify their usage. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18242) Custom OptionsFactory in user code not working when configured via code
Nico Kruber created FLINK-18242: --- Summary: Custom OptionsFactory in user code not working when configured via code Key: FLINK-18242 URL: https://issues.apache.org/jira/browse/FLINK-18242 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.10.1, 1.10.0 Reporter: Nico Kruber Attachments: DefaultConfigurableOptionsFactoryWithLog.java When I configure a custom {{OptionsFactory}} for RocksDB like this: {code:java} Configuration globalConfig = GlobalConfiguration.loadConfiguration(); String checkpointDataUri = globalConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY); RocksDBStateBackend stateBackend = new RocksDBStateBackend(checkpointDataUri); stateBackend.setOptions(new DefaultConfigurableOptionsFactoryWithLog()); env.setStateBackend((StateBackend) stateBackend);{code} it seems to be loaded {code:java} 2020-06-10 12:54:20,720 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using predefined options: DEFAULT. 2020-06-10 12:54:20,721 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using application-defined options factory: DefaultConfigurableOptionsFactoryWithLog{DefaultConfigurableOptionsFactory{configuredOptions={}}}. {code} but it seems like none of the options defined in there is actually used. Just as an example, my factory does set the info log level to {{INFO_LEVEL}} but this is what you will see in the created RocksDB instance: {code:java} > cat /tmp/flink-io-c95e8f48-0daa-4fb9-a9a7-0e4fb42e9135/*/db/OPTIONS*|grep > info_log_level info_log_level=HEADER_LEVEL info_log_level=HEADER_LEVEL{code} Together with the bug from FLINK-18241, is seems I cannot re-activate the RocksDB log that we disabled in FLINK-15068. FLINK-15747 was aiming at changing that particular configuration, but the problem seems broader since {{setDbLogDir()}} was actually also ignored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18241) Custom OptionsFactory in user code not working when configured via flink-conf.yaml
Nico Kruber created FLINK-18241: --- Summary: Custom OptionsFactory in user code not working when configured via flink-conf.yaml Key: FLINK-18241 URL: https://issues.apache.org/jira/browse/FLINK-18241 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.10.1, 1.10.0 Reporter: Nico Kruber Attachments: DefaultConfigurableOptionsFactoryWithLog.java It seems like Flink 1.10 broke custom {{OptionsFactory}} definitions via the {{state.backend.rocksdb.options-factory}} configuration if the implementation resides in the user-code jar file. This is particularly bad to debug RocksDB issues since we disabled its (ever-growing) LOG file in FLINK-15068. If you look at the stack trace from the error below, you will notice, that {{StreamExecutionEnvironment}} is not provided with a user-code classloader and will us the one of its own class which is the parent loader that does not know about our {{OptionsFactory}}. This exact same code was working with Flink 1.9.3. (I believe putting the custom {{OptionsFactory}} into a separate jar file inside Flink's lib folder may be a workaround but that should ideally not be needed). {code:java} 2020-06-09 16:18:59,409 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not start cluster entrypoint StandaloneJobClusterEntryPoint. org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StandaloneJobClusterEntryPoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:192) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:525) [flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:116) [flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent. at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:261) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:220) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:174) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:173) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] ... 2 more Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve the JobGraph. at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:57) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:196) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:220) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:174) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:173) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] ... 2 more Caused by: org.apache.flink.util.FlinkException: Could not create the JobGraph from the provided user code jar. at org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.retrieveJobGraph(ClassPathJobGraphRetriever.java:114) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at
[jira] [Created] (FLINK-17706) Clarify licensing situation
Nico Kruber created FLINK-17706: --- Summary: Clarify licensing situation Key: FLINK-17706 URL: https://issues.apache.org/jira/browse/FLINK-17706 Project: Flink Issue Type: Sub-task Components: Benchmarks Affects Versions: 1.11.0 Reporter: Nico Kruber Fix For: 1.11.0 After enabling the rat plugin, it finds the following files with missing or invalid license headers: {code:java} src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java src/main/java/org/apache/flink/benchmark/functions/IntLongApplications.java src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java src/main/java/org/apache/flink/benchmark/functions/LongSource.java src/main/java/org/apache/flink/benchmark/functions/MultiplyByTwo.java src/main/java/org/apache/flink/benchmark/functions/MultiplyIntLongByTwo.java src/main/java/org/apache/flink/benchmark/functions/SuccessException.java src/main/java/org/apache/flink/benchmark/functions/SumReduce.java src/main/java/org/apache/flink/benchmark/functions/SumReduceIntLong.java src/main/java/org/apache/flink/benchmark/functions/ValidatingCounter.java src/main/java/org/apache/flink/benchmark/functions/QueuingLongSource.java src/main/java/org/apache/flink/benchmark/CollectSink.java src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoCoStreamMap.java src/main/resources/avro/mypojo.avsc src/main/resources/protobuf/MyPojo.proto src/main/resources/thrift/mypojo.thrift save_jmh_result.py {code} The license should be clarified with the author and all contributors of that file. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17705) Add rat license checks
Nico Kruber created FLINK-17705: --- Summary: Add rat license checks Key: FLINK-17705 URL: https://issues.apache.org/jira/browse/FLINK-17705 Project: Flink Issue Type: Sub-task Components: Benchmarks Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.11.0 Before the code from [https://github.com/dataArtisans/flink-benchmarks/] is contributed, the licenses should be cleaned up and as a first step, we should set up the {{apache-rat-plugin}} similarly to how the Flink main repo uses it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17704) Allow running specific benchmarks from maven directly
Nico Kruber created FLINK-17704: --- Summary: Allow running specific benchmarks from maven directly Key: FLINK-17704 URL: https://issues.apache.org/jira/browse/FLINK-17704 Project: Flink Issue Type: Improvement Components: Benchmarks Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.11.0 Sometimes it would be nice to run a specific benchmark from maven directly. Currently this can be done via: {code:java} mvn -Dflink.version=1.11-SNAPSHOT clean package exec:exec -Dexec.executable=java -Dexec.args="-jar target/flink-hackathon-benchmarks-0.1.jar -rf csv org.apache.flink.benchmark.StreamNetworkThroughputBenchmarkExecutor"{code} but this is quite cumbersome and erroneous. Instead, I propose to simply define a property which by default runs all benchmarks but can be overridden on the command line to run a specific pattern (that is interpreted by JMH) like this: {code:java} mvn -Dflink.version=1.11-SNAPSHOT exec:exec -Dbenchmarks="org.apache.flink.benchmark.StreamNetworkThroughputBenchmarkExecutor" {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17703) Default execution command fails due 'benchmark' profile being inactive
Nico Kruber created FLINK-17703: --- Summary: Default execution command fails due 'benchmark' profile being inactive Key: FLINK-17703 URL: https://issues.apache.org/jira/browse/FLINK-17703 Project: Flink Issue Type: Improvement Components: Benchmarks Affects Versions: 1.11.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.11.0 FLINK-17057 had some unfortunate side effects: by having the "{{include-netty-tcnative-dynamic"}} profile active by default, the "{{benchmark"}} profile was not active any more. Thus the following command that was typically used for running the benchmarks failed unless the "{{benchmark"}} profile was activated manually like this: {code:java} mvn -Dflink.version=1.11-SNAPSHOT clean package exec:exec -P benchmark{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17293) Port training exercises data sets to a generator
Nico Kruber created FLINK-17293: --- Summary: Port training exercises data sets to a generator Key: FLINK-17293 URL: https://issues.apache.org/jira/browse/FLINK-17293 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Affects Versions: 1.10.1 Reporter: Nico Kruber Assignee: Nico Kruber Currently, the training exercises still rely on training data hosted at Ververica: - http://training.ververica.com/trainingData/nycTaxiRides.gz and - http://training.ververica.com/trainingData/nycTaxiFares.gz Since this has always been a problem for users (and one additional step), I propose to rewrite the training sources to use a data generator instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17279) Use gradle build scans
Nico Kruber created FLINK-17279: --- Summary: Use gradle build scans Key: FLINK-17279 URL: https://issues.apache.org/jira/browse/FLINK-17279 Project: Flink Issue Type: Improvement Components: Training Excercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.10.1, 1.11.0 Gradle build scans [1] add quick analysis into what happened if a CI build failed. It would upload a report with detailed info to [1]. See this for an example: https://gradle.com/s/g3tdhu47lntoc [1] https://scans.gradle.com/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17278) Add Travis to the training exercises
Nico Kruber created FLINK-17278: --- Summary: Add Travis to the training exercises Key: FLINK-17278 URL: https://issues.apache.org/jira/browse/FLINK-17278 Project: Flink Issue Type: Improvement Components: Training Excercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.10.1, 1.11.0 This will run all the tests and verify code quality. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17277) Apply IntelliJ recommendations to training exercises
Nico Kruber created FLINK-17277: --- Summary: Apply IntelliJ recommendations to training exercises Key: FLINK-17277 URL: https://issues.apache.org/jira/browse/FLINK-17277 Project: Flink Issue Type: Improvement Components: Training Excercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.10.1, 1.11.0 IntelliJ has a few recommendations on the original code of the training exercises. These should be addressed to serve as good reference code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17276) Add checkstyle to training exercises
Nico Kruber created FLINK-17276: --- Summary: Add checkstyle to training exercises Key: FLINK-17276 URL: https://issues.apache.org/jira/browse/FLINK-17276 Project: Flink Issue Type: Improvement Components: Training Excercises Affects Versions: 1.10.0, 1.11.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.10.1, 1.11.0 Port Flink's checkstyle to the training exercises and adapt the code accordingly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17275) Add core training exercises
Nico Kruber created FLINK-17275: --- Summary: Add core training exercises Key: FLINK-17275 URL: https://issues.apache.org/jira/browse/FLINK-17275 Project: Flink Issue Type: New Feature Components: Training Excercises Affects Versions: 1.11.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.10.1, 1.11.0 Port the core training exercises, their descriptions, solutions, and tests from https://github.com/ververica/flink-training-exercises to Apache Flink. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17171) Blink planner fails to compile Table program with POJO source
Nico Kruber created FLINK-17171: --- Summary: Blink planner fails to compile Table program with POJO source Key: FLINK-17171 URL: https://issues.apache.org/jira/browse/FLINK-17171 Project: Flink Issue Type: Bug Components: Table SQL / Planner, Table SQL / Runtime Affects Versions: 1.10.0 Reporter: Nico Kruber Attachments: error.log It seems as if FLINK-13993 made the Table API (Blink planner) unusable for POJO sources where the POJO class is in user code. For https://github.com/ververica/lab-sql-vs-datastream/blob/master/src/main/java/com/ververica/LateralTableJoin.java I get the following Exception when I run it on a Flink 1.10.0 cluster (full version attached): {code} 2020-04-15 17:19:15,561 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled exception. org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. ... Caused by: org.codehaus.commons.compiler.CompileException: Line 28, Column 175: Cannot determine simple type name "com" ... at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) {code} I enabled debug logs and this is what it is trying to compile: {code} @Override public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) (org.apache.flink.table.dataformat.BaseRow) converter$15.toInternal((com.ververica.tables.FactTable.Fact) element.getValue()); ... {code} I use a standalone cluster and submit via web UI and also verified that my jar file does not contain anything else but its compiled classes. This code is working fine inside the IDE and was also working with Flink 1.10 and VVP 2.0 which did not use a dedicated class loader for user code. My guess is that the (generated) code does not have access to {{FactTable.Fact}} and the Janino compiler does not produce the right error message seeing "com" as a primitive type instead. FLINK-7490 and FLINK-9220 seem related but too old (legacy planner). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17143) Blog feed.xml should only contain excerpts, not full contents
Nico Kruber created FLINK-17143: --- Summary: Blog feed.xml should only contain excerpts, not full contents Key: FLINK-17143 URL: https://issues.apache.org/jira/browse/FLINK-17143 Project: Flink Issue Type: Improvement Components: Project Website Affects Versions: 1.10.0 Reporter: Nico Kruber The blog's atom 2.0 feed at https://flink.apache.org/blog/feed.xml contains the whole content of all blog posts while it should probably only contain the excerpts (and links to the full versions) as usual. This may save some unnecessary web site traffic (bytes) from users using the feed to get updates. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17057) Add OpenSSL micro-benchmarks
Nico Kruber created FLINK-17057: --- Summary: Add OpenSSL micro-benchmarks Key: FLINK-17057 URL: https://issues.apache.org/jira/browse/FLINK-17057 Project: Flink Issue Type: New Feature Components: Benchmarks Affects Versions: 1.11.0 Reporter: Nico Kruber Assignee: Nico Kruber Our JMH micro-benchmarks currently only run with Java's SSL implementation but it would also be nice to have them evaluated with OpenSSL. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17056) JMH main() methods call unrelated benchmarks
Nico Kruber created FLINK-17056: --- Summary: JMH main() methods call unrelated benchmarks Key: FLINK-17056 URL: https://issues.apache.org/jira/browse/FLINK-17056 Project: Flink Issue Type: Bug Components: Benchmarks Affects Versions: 1.10.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.11.0 Each benchmark class is accompanied by an according {{public static main (String[] args)}} method which should run all benchmarks in that class. However, it just uses the class' simple name in a regexp like ".*.*" and may thus also match further classes that were not intended to run. An example for this is the {{StreamNetworkThroughputBenchmarkExecutor}} which also runs benchmarks from {{DataSkewStreamNetworkThroughputBenchmarkExecutor}}. Using the canonical name instead fixes that behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16890) Add AvroGeneric benchmark
Nico Kruber created FLINK-16890: --- Summary: Add AvroGeneric benchmark Key: FLINK-16890 URL: https://issues.apache.org/jira/browse/FLINK-16890 Project: Flink Issue Type: New Feature Components: Benchmarks Reporter: Nico Kruber Assignee: Nico Kruber Currently, serialization benchmarks for Avro cover specific records and Avro reflect. What is missing is GenericRecord which I propose to add to {{SerializationFrameworkAllBenchmarks}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16729) Offer an out-of-the-box Set serializer
Nico Kruber created FLINK-16729: --- Summary: Offer an out-of-the-box Set serializer Key: FLINK-16729 URL: https://issues.apache.org/jira/browse/FLINK-16729 Project: Flink Issue Type: New Feature Components: API / Type Serialization System Affects Versions: 1.10.0 Reporter: Nico Kruber Currently, Set types are serialized by Kryo by default, since Flink does not come with an own SetSerializer (only one for maps). While the MapSerializer can be easily adapted to cover sets instead, I think, this should be available by default to get the maximum performance out of Flink (kryo is slow!) When this is added, however, we need to provide a migration path for old state (or not use the new SetSerializer by default but offer to opt-in). This may need further investigation as to whether it is possible to migrate from kryo automatically and whether we can check potential changes to the encapsulated entry class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16664) Unable to set DataStreamSource parallelism to default (-1)
Nico Kruber created FLINK-16664: --- Summary: Unable to set DataStreamSource parallelism to default (-1) Key: FLINK-16664 URL: https://issues.apache.org/jira/browse/FLINK-16664 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.10.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.10.1, 1.11.0 A hotfix part of FLINK-14405 actually breaks setting the parallelism to its default value for datastream sources, i.e. using value {{-1}}. This is because of a small typo: instead of {code:java} OperatorValidationUtils.validateParallelism(parallelism, isParallel); {code} this is called in org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism: {code:java} OperatorValidationUtils.validateMaxParallelism(parallelism, isParallel); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16576) State inconsistency on restore with memory state backends
Nico Kruber created FLINK-16576: --- Summary: State inconsistency on restore with memory state backends Key: FLINK-16576 URL: https://issues.apache.org/jira/browse/FLINK-16576 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.10.0, 1.9.2 Reporter: Nico Kruber Fix For: 1.9.3, 1.10.1, 1.11.0 I occasionally see a few state inconsistencies with the {{TopSpeedWindowing}} example in Flink. Restore would fail with either of these causes, but only for the memory state backends and only with some combinations of parallelism I took the savepoint with and parallelism I restore the job with: {code:java} java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64, endKeyGroup=95} does not contain key group 97 {code} or {code:java} java.lang.NullPointerException at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280) {code} or {code:java} java.io.IOException: Corrupt stream, found tag: 8 at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217) {code} I managed to make it reproducible in a test that I quickly hacked together in [https://github.com/NicoK/flink/blob/state.corruption.debug/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingSavepointRestoreITCase.java] (please checkout the whole repository since I had to change some dependencies). In a bit more detail, this is what I discovered before, also with a manual savepoint on S3: Savepoint that was taken with parallelism 2 (p=2) and shows the restore failure in three different ways (all running in Flink 1.10.0; but I also see it in Flink 1.9): * first of all, if I try to restore with p=2, everything is fine * if I restore with p=4 I get an exception like the one mentioned above: {code:java} 2020-03-11 15:53:35,149 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction) -> Sink: Print to Std. Out (3/4) (2ecdb03905cc8a376d43b086925452a6) switched from RUNNING to FAILED. java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.lang.IllegalArgumentException:
[jira] [Created] (FLINK-16556) TopSpeedWindowing should implement checkpointing for its source
Nico Kruber created FLINK-16556: --- Summary: TopSpeedWindowing should implement checkpointing for its source Key: FLINK-16556 URL: https://issues.apache.org/jira/browse/FLINK-16556 Project: Flink Issue Type: Bug Components: Examples Affects Versions: 1.10.0 Reporter: Nico Kruber {\{org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.CarSource}} does not implement checkpointing of its state, namely the current speeds and distances per car. The main problem with this is that the window trigger only fires if the new distance has increased by at least 50 but after restore, it will be reset to 0 and could thus not produce output for a while. Either the distance calculation could use {{Math.abs}} or the source needs proper checkpointing. Optionally with allowing the number of cars to increase/decrease. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16461) Multiple Bindings with SLF4J
Nico Kruber created FLINK-16461: --- Summary: Multiple Bindings with SLF4J Key: FLINK-16461 URL: https://issues.apache.org/jira/browse/FLINK-16461 Project: Flink Issue Type: Bug Components: Benchmarks Affects Versions: 1.11.0 Reporter: Nico Kruber Running benchmarks currently produces this warning: {code} SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/nico/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/nico/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] {code} The reason for this is this from the dependency tree: {code} org.apache.flink.benchmark:flink-hackathon-benchmarks:jar:0.1 +- org.apache.flink:flink-test-utils-junit:jar:1.11-SNAPSHOT:compile | +- org.apache.logging.log4j:log4j-slf4j-impl:jar:2.12.1:compile +- org.slf4j:slf4j-log4j12:jar:1.7.7:compile {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16459) Add Serialization benchmark for Apache Thrift
Nico Kruber created FLINK-16459: --- Summary: Add Serialization benchmark for Apache Thrift Key: FLINK-16459 URL: https://issues.apache.org/jira/browse/FLINK-16459 Project: Flink Issue Type: New Feature Components: Benchmarks Reporter: Nico Kruber Assignee: Nico Kruber The current set of serializers for which we do performance benchmarks in {{SerializationFrameworkAllBenchmarks}} contains: * POJO * Tuple * Row * Avro * Kryo We should also add one for Apache thrift (via Kryo) to see how that goes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16460) Add Serialization benchmark for Protobuf
Nico Kruber created FLINK-16460: --- Summary: Add Serialization benchmark for Protobuf Key: FLINK-16460 URL: https://issues.apache.org/jira/browse/FLINK-16460 Project: Flink Issue Type: New Feature Components: Benchmarks Reporter: Nico Kruber Assignee: Nico Kruber The current set of serializers for which we do performance benchmarks in {{SerializationFrameworkAllBenchmarks}} contains: * POJO * Tuple * Row * Avro * Kryo We should also add one for Protobuf (via Kryo) to see how that goes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16301) Annoying "Cannot find FunctionDefinition" messages with SQL for f_proctime or =
Nico Kruber created FLINK-16301: --- Summary: Annoying "Cannot find FunctionDefinition" messages with SQL for f_proctime or = Key: FLINK-16301 URL: https://issues.apache.org/jira/browse/FLINK-16301 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: Nico Kruber When running the following SQL query {code} SELECT D1.col1 AS A, D1.col2 AS B, D1.col3 AS C, D1.col4 AS D, D1.col5 AS E, D2.col1 AS F, D2.col2 AS G, D2.col3 AS H, D2.col4 AS I, D2.col5 AS J, D3.col1 AS K, D3.col2 AS L, D3.col3 AS M, D3.col4 AS N, D3.col5 AS O, D4.col1 AS P, D4.col2 AS Q, D4.col3 AS R, D4.col4 AS S, D4.col5 AS T, D5.col1 AS U, D5.col2 AS V, D5.col3 AS W, D5.col4 AS X, D5.col5 AS Y FROM fact_table, LATERAL TABLE (dimension_table1(f_proctime)) AS D1, LATERAL TABLE (dimension_table2(f_proctime)) AS D2, LATERAL TABLE (dimension_table3(f_proctime)) AS D3, LATERAL TABLE (dimension_table4(f_proctime)) AS D4, LATERAL TABLE (dimension_table5(f_proctime)) AS D5 WHERE fact_table.dim1 = D1.id AND fact_table.dim2 = D2.id AND fact_table.dim3 = D3.id AND fact_table.dim4 = D4.id AND fact_table.dim5 = D5.id {code} with the Blink planner, it prints a log of bogus warnings about unknown functions for things like {{f_proctime}} or {{=}} at INFO level which should be DEBUG level at least in order not to bother the users with it. The messages I got are: {code} 13:33:59,590 INFO org.apache.flink.table.module.ModuleManager - Cannot find FunctionDefinition f_proctime from any loaded modules 13:33:59,641 INFO org.apache.flink.table.module.ModuleManager - Cannot find FunctionDefinition f_proctime from any loaded modules 13:33:59,644 INFO org.apache.flink.table.module.ModuleManager - Cannot find FunctionDefinition f_proctime from any loaded modules 13:33:59,647 INFO org.apache.flink.table.module.ModuleManager - Cannot find FunctionDefinition f_proctime from any loaded modules 13:33:59,650 INFO org.apache.flink.table.module.ModuleManager - Cannot find FunctionDefinition f_proctime from any loaded modules 13:33:59,662 INFO org.apache.flink.table.module.ModuleManager - Cannot find FunctionDefinition = from any loaded modules 13:33:59,665 INFO org.apache.flink.table.module.ModuleManager - Cannot find FunctionDefinition = from any loaded modules 13:33:59,666 INFO org.apache.flink.table.module.ModuleManager - Got FunctionDefinition and from module core 13:33:59,667 INFO org.apache.flink.table.module.ModuleManager - Cannot find FunctionDefinition = from any loaded modules 13:33:59,668 INFO org.apache.flink.table.module.ModuleManager - Got FunctionDefinition and from module core 13:33:59,669 INFO org.apache.flink.table.module.ModuleManager - Cannot find FunctionDefinition = from any loaded modules 13:33:59,670 INFO org.apache.flink.table.module.ModuleManager - Got FunctionDefinition and from module core {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16282) Wrong exception using DESCRIBE SQL command
Nico Kruber created FLINK-16282: --- Summary: Wrong exception using DESCRIBE SQL command Key: FLINK-16282 URL: https://issues.apache.org/jira/browse/FLINK-16282 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: Nico Kruber When trying to describe a table like this {code:java} Table facttable = tEnv.sqlQuery("DESCRIBE fact_table"); {code} currently, you get a strange exception which should rather be a "not supported" exception {code} Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 10 to line 1, column 19: Column 'fact_table' not found in any table at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130) at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) at org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124) at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) at com.ververica.LateralTableJoin.main(LateralTableJoin.java:92) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 10 to line 1, column 19: Column 'fact_table' not found in any table at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:2943) at org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:297) at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5304) at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:116) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:943) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:650) at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:126) ... 5 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'fact_table' not found in any table at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 17 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15863) Fix docs stating that savepoints are relocatable
Nico Kruber created FLINK-15863: --- Summary: Fix docs stating that savepoints are relocatable Key: FLINK-15863 URL: https://issues.apache.org/jira/browse/FLINK-15863 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.9.2, 1.10.0 Reporter: Nico Kruber This section from https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#preconditions states that savepoints are relocatable which they are not yet (see FLINK-5763). It should be fixed and/or removed; I'm unsure what change from 1.3 it should actually reflect. {quote}Another important precondition is that for savepoints taken before Flink 1.3.x, all the savepoint data must be accessible from the new installation and reside under the same absolute path. Before Flink 1.3.x, the savepoint data is typically not self-contained in just the created savepoint file. Additional files can be referenced from inside the savepoint file (e.g. the output from state backend snapshots). Since Flink 1.3.x, this is no longer a limitation; savepoints can be relocated using typical filesystem operations..{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)