[jira] [Created] (FLINK-26040) PrometheusReporterEndToEndITCase hang on azure pipeline

2022-02-08 Thread Yun Gao (Jira)
Yun Gao created FLINK-26040:
---

 Summary: PrometheusReporterEndToEndITCase hang on azure pipeline
 Key: FLINK-26040
 URL: https://issues.apache.org/jira/browse/FLINK-26040
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.13.5
Reporter: Yun Gao



{code:java}
"main" #1 prio=5 os_prio=0 tid=0x7ffb1400b800 nid=0x9590b in Object.wait() 
[0x7ffb199a8000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x8a2fd4b8> (a java.lang.UNIXProcess)
at java.lang.Object.wait(Object.java:460)
at java.util.concurrent.TimeUnit.timedWait(TimeUnit.java:348)
at java.lang.UNIXProcess.waitFor(UNIXProcess.java:410)
- locked <0x8a2fd4b8> (a java.lang.UNIXProcess)
at 
org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:134)
at 
org.apache.flink.tests.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:115)
at 
org.apache.flink.tests.util.flink.FlinkDistribution.startFlinkCluster(FlinkDistribution.java:119)
at 
org.apache.flink.tests.util.flink.LocalStandaloneFlinkResource.startCluster(LocalStandaloneFlinkResource.java:133)
at 
org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase.testReporter(PrometheusReporterEndToEndITCase.java:231)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
at 
org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)

{code}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30955=logs=08866332-78f7-59e4-4f7e-49a56faa3179=7f606211-1454-543c-70ab-c7a028a1ce8c=30397



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26039) Incorrect value getter in map unnest table function

2022-02-08 Thread Han (Jira)
Han created FLINK-26039:
---

 Summary: Incorrect value getter in map unnest table function
 Key: FLINK-26039
 URL: https://issues.apache.org/jira/browse/FLINK-26039
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.14.3
Reporter: Han
 Fix For: 1.15.0


Suppose we have a map field that needs to be expanded.

 
{code:java}
CREATE TABLE t (
    id INT,
    map_field MAP
) WITH (
    -- ...
);

SELECT id, k, v FROM t, unnest(map_field) as A(k, v);{code}
 

 

We will get the following runtime exception:
{code:java}
Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.binary.BinaryStringData cannot be cast to 
java.lang.Integer
    at 
org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
    at 
org.apache.flink.table.data.utils.JoinedRowData.getInt(JoinedRowData.java:149)
    at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
    at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at 
org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
    at 
org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:68)
    at StreamExecCorrelate$10$TableFunctionCollector$4.collect(Unknown Source)
    at 
org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
    at 
StreamExecCorrelate$10$TableFunctionResultConverterCollector$8.collect(Unknown 
Source)
    at 
org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:197)
    at 
org.apache.flink.table.runtime.functions.SqlUnnestUtils$MapUnnestTableFunction.eval(SqlUnnestUtils.java:169)
    at StreamExecCorrelate$10.processElement(Unknown Source)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26038) Support delay message on Pulsar sink

2022-02-08 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-26038:
---

 Summary: Support delay message on Pulsar sink
 Key: FLINK-26038
 URL: https://issues.apache.org/jira/browse/FLINK-26038
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Reporter: Yufan Sheng






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26037) TaskManagerRunnerTest JVM crash with exit code 239

2022-02-08 Thread Yun Gao (Jira)
Yun Gao created FLINK-26037:
---

 Summary: TaskManagerRunnerTest JVM crash with exit code 239
 Key: FLINK-26037
 URL: https://issues.apache.org/jira/browse/FLINK-26037
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Yun Gao



{code:java}
Feb 09 02:22:52 [ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test) on 
project flink-runtime: There are test failures.
Feb 09 02:22:52 [ERROR] 
Feb 09 02:22:52 [ERROR] Please refer to 
/__w/2/s/flink-runtime/target/surefire-reports for the individual test results.
Feb 09 02:22:52 [ERROR] Please refer to dump files (if any exist) [date].dump, 
[date]-jvmRun[N].dump and [date].dumpstream.
Feb 09 02:22:52 [ERROR] ExecutionException The forked VM terminated without 
properly saying goodbye. VM crash or System.exit called?
Feb 09 02:22:52 [ERROR] Command was /bin/sh -c cd /__w/2/s/flink-runtime && 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -Xms256m -Xmx2048m 
-Dmvn.forkNumber=2 -XX:+UseG1GC -Duser.country=US -Duser.language=en -jar 
/__w/2/s/flink-runtime/target/surefire/surefirebooter2302747870022684931.jar 
/__w/2/s/flink-runtime/target/surefire 2022-02-09T01-58-20_619-jvmRun2 
surefire8313405181432833392tmp surefire_2318122906826894495431tmp
Feb 09 02:22:52 [ERROR] Error occurred in starting fork, check output in log
Feb 09 02:22:52 [ERROR] Process Exit Code: 239
Feb 09 02:22:52 [ERROR] Crashed tests:
Feb 09 02:22:52 [ERROR] 
org.apache.flink.runtime.taskexecutor.TaskManagerRunnerTest
Feb 09 02:22:52 [ERROR] 
org.apache.maven.surefire.booter.SurefireBooterForkException: 
ExecutionException The forked VM terminated without properly saying goodbye. VM 
crash or System.exit called?
Feb 09 02:22:52 [ERROR] Command was /bin/sh -c cd /__w/2/s/flink-runtime && 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -Xms256m -Xmx2048m 
-Dmvn.forkNumber=2 -XX:+UseG1GC -Duser.country=US -Duser.language=en -jar 
/__w/2/s/flink-runtime/target/surefire/surefirebooter2302747870022684931.jar 
/__w/2/s/flink-runtime/target/surefire 2022-02-09T01-58-20_619-jvmRun2 
surefire8313405181432833392tmp surefire_2318122906826894495431tmp
Feb 09 02:22:52 [ERROR] Error occurred in starting fork, check output in log
Feb 09 02:22:52 [ERROR] Process Exit Code: 239
Feb 09 02:22:52 [ERROR] Crashed tests:
Feb 09 02:22:52 [ERROR] 
org.apache.flink.runtime.taskexecutor.TaskManagerRunnerTest
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:532)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:479)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:322)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:266)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1314)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1159)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:932)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:355)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.DefaultMaven.execute(DefaultMaven.java:155)
Feb 09 02:22:52 [ERROR] at 
org.apache.maven.cli.MavenCli.execute(MavenCli.java:584)

{code}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30956=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7=9164



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26036) LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory timeout on azure

2022-02-08 Thread Yun Gao (Jira)
Yun Gao created FLINK-26036:
---

 Summary: 
LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory 
timeout on azure
 Key: FLINK-26036
 URL: https://issues.apache.org/jira/browse/FLINK-26036
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.15.0
Reporter: Yun Gao



{code:java}
022-02-09T02:18:17.1827314Z Feb 09 02:18:14 [ERROR] 
org.apache.flink.test.recovery.LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory
  Time elapsed: 62.252 s  <<< ERROR!
2022-02-09T02:18:17.1827940Z Feb 09 02:18:14 
java.util.concurrent.TimeoutException
2022-02-09T02:18:17.1828450Z Feb 09 02:18:14at 
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
2022-02-09T02:18:17.1829040Z Feb 09 02:18:14at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
2022-02-09T02:18:17.1829752Z Feb 09 02:18:14at 
org.apache.flink.test.recovery.LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory(LocalRecoveryITCase.java:115)
2022-02-09T02:18:17.1830407Z Feb 09 02:18:14at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-02-09T02:18:17.1830954Z Feb 09 02:18:14at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2022-02-09T02:18:17.1831582Z Feb 09 02:18:14at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-02-09T02:18:17.1832135Z Feb 09 02:18:14at 
java.lang.reflect.Method.invoke(Method.java:498)
2022-02-09T02:18:17.1832697Z Feb 09 02:18:14at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
2022-02-09T02:18:17.1833566Z Feb 09 02:18:14at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
2022-02-09T02:18:17.1834394Z Feb 09 02:18:14at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
2022-02-09T02:18:17.1835125Z Feb 09 02:18:14at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
2022-02-09T02:18:17.1835875Z Feb 09 02:18:14at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
2022-02-09T02:18:17.1836565Z Feb 09 02:18:14at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
2022-02-09T02:18:17.1837294Z Feb 09 02:18:14at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
2022-02-09T02:18:17.1838007Z Feb 09 02:18:14at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
2022-02-09T02:18:17.1838743Z Feb 09 02:18:14at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
2022-02-09T02:18:17.1839499Z Feb 09 02:18:14at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
2022-02-09T02:18:17.1840224Z Feb 09 02:18:14at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
2022-02-09T02:18:17.1840952Z Feb 09 02:18:14at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
2022-02-09T02:18:17.1841616Z Feb 09 02:18:14at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
2022-02-09T02:18:17.1842257Z Feb 09 02:18:14at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
2022-02-09T02:18:17.1842951Z Feb 09 02:18:14at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
2022-02-09T02:18:17.1843681Z Feb 09 02:18:14at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2022-02-09T02:18:17.1844782Z Feb 09 02:18:14at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
2022-02-09T02:18:17.1845603Z Feb 09 02:18:14at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
2022-02-09T02:18:17.1846375Z Feb 09 02:18:14at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
2022-02-09T02:18:17.1847084Z Feb 09 02:18:14at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
2022-02-09T02:18:17.1847785Z Feb 09 02:18:14at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2022-02-09T02:18:17.1848490Z Feb 09 02:18:14 

[jira] [Created] (FLINK-26035) Rework loader-bundle into separate module

2022-02-08 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-26035:


 Summary: Rework loader-bundle into separate module
 Key: FLINK-26035
 URL: https://issues.apache.org/jira/browse/FLINK-26035
 Project: Flink
  Issue Type: Technical Debt
  Components: Build System, Table SQL / Planner
Affects Versions: 1.15.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.15.0


The flink-table-planner currently creates 2 artifacts. 1 jar containing the 
planner and various dependencies for the cases where the planner is used 
directly, and another jar that additionally bundles scala for cases where the 
loader is used.

The latter artifact is purely an intermediate build artifact, and as such we 
usually wouldn't want to publish it. This is particularly important because 
this jar doesn't have a correct NOTICE, and having different NOTICE files for 
different artifacts is surprisingly tricky.

We should just rework this into a separate module.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[DISCUSS]Support the merge statement in FlinkSQL

2022-02-08 Thread zhou chao
Hi, devs!
Jingfeng and I would like to start a discussion about the MERGE statement, and 
the discussion consists of two parts. In the first part, we want to explore and 
collect the cases and motivations of the MERGE statement users. In the second 
part, we want to find out the possibility for Flink SQL to support the merge 
statement.

Before driving the first topic, we want to introduce the definition and 
benefits of the merge statement. The MERGE statement in SQL is a very popular 
clause and it can handle inserts, updates, and deletes all in a single 
transaction without having to write separate logic for each of these. 
For each insert, update, or delete statement, we can specify conditions 
separately. Now, many Engine/DBs have supported this feature, for example, SQL 
Server[1], Spark[2], Hive[3],  pgSQL[4]. 

Our use case: 
Order analysis & processing is one the most important scenario, but sometimes 
updated orders have a long time span compared with the last one with the same 
primary key, in the meanwhile, the states for this key have expired, such that 
the wrong Agg result will be achieved. In this situation, we use the merge 
statement in a batch job to correct the results, and now spark + iceberg is 
chosen in our internal. In the future, we want to unify the batch & streaming 
by using FlinkSQL in our internal, it would be better if Flink could support 
the merge statement. If you have other use cases and opinions, plz show us here.

Now, calcite does not have good support for the merge statement, and there 
exists a Jira CALCITE-4338[5] to track. Could we support the merge statement 
relying on the limited support from calcite-1.26.0? I wrote a simple doc[6] to 
drive this, just want to find out the possibility for Flink SQL to support the 
merge statement.

Looking forward to your feedback, thanks. 

best,
zoucao


[1]https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN=sql-server-ver15
[2]https://issues.apache.org/jira/browse/SPARK-28893
[3]https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
[4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html
[5]https://issues.apache.org/jira/browse/CALCITE-4338
[6]https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing

[jira] [Created] (FLINK-26034) Add maven wapper for flink

2022-02-08 Thread Aiden Gong (Jira)
Aiden Gong created FLINK-26034:
--

 Summary: Add maven wapper for flink
 Key: FLINK-26034
 URL: https://issues.apache.org/jira/browse/FLINK-26034
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Affects Versions: 1.15.0
Reporter: Aiden Gong
 Fix For: 1.15.0


Idea just support this feature now. It is very helpful for us.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-08 Thread Gen Luo
Hi Chesney and Piotr,

I have seen some jobs with tens of independent vertices that process data
for the same business. The sub jobs should be started or stopped together.
Splitting them into separate jobs means the user has to manage them
separately. But in fact the jobs were running in per-job mode, and maybe
there's now a better choice. Let's see if others have some more valuable
cases.

By the way, I'd like to point out that if we can checkpoint pipeline
regions individually, even a job with only one job graph, if it has no
all-to-all edges connecting all vertices into one pipeline region, may
benefit from this effort, since any failure, long-time pause or
backpressure in a pipeline region will not block the checkpointing of other
regions.

And @Piotr, this is why I think that this discussion may relate to the
task-local checkpoints. Both of them require to checkpoint parts of a job
individually, and can restore only a part of the job, without breaking the
consistency. The main difference is that to maintain the consistency,
task-local checkpoints have to handle the channel data. This is omitted in
the approximate task-local recovery since the consistency is not
guaranteed, and this is why the approximate task-local recovery may use a
part of the global snapshot, rather than individually checkpointing each
subtask. However, in the pipeline region checkpoints, consistency is
guaranteed naturally. We can focus on how to checkpoint individually, the
effort of which is probably necessary if we want to implement the
task-local checkpointing with consistency guarantee.

On Tue, Feb 8, 2022 at 7:41 PM 丛鹏  wrote:

> hi guys,If I understand it correctly, will only some checkpoints be
> recovered when there is an error in the Flink batch?
>
> Piotr Nowojski  于2022年2月8日周二 19:05写道:
>
>> Hi,
>>
>> I second Chesnay's comment and would like to better understand the
>> motivation behind this. At the surface it sounds to me like this might
>> require quite a bit of work for a very narrow use case.
>>
>> At the same time I have a feeling that Yuan, you are mixing this feature
>> request (checkpointing subgraphs/pipeline regions independently) and a
>> very
>> very different issue of "task local checkpoints"? Those problems are kind
>> of similar, but not quite.
>>
>> Best,
>> Piotrek
>>
>> wt., 8 lut 2022 o 11:44 Chesnay Schepler  napisał(a):
>>
>> > Could someone expand on these operational issues you're facing when
>> > achieving this via separate jobs?
>> >
>> > I feel like we're skipping a step, arguing about solutions without even
>> > having discussed the underlying problem.
>> >
>> > On 08/02/2022 11:25, Gen Luo wrote:
>> > > Hi,
>> > >
>> > > @Yuan
>> > > Do you mean that there should be no shared state between source
>> subtasks?
>> > > Sharing state between checkpoints of a specific subtask should be
>> fine.
>> > >
>> > > Sharing state between subtasks of a task can be an issue, no matter
>> > whether
>> > > it's a source. That's also what I was afraid of in the previous
>> replies.
>> > In
>> > > one word, if the behavior of a pipeline region can somehow influence
>> the
>> > > state of other pipeline regions, their checkpoints have to be aligned
>> > > before rescaling.
>> > >
>> > > On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei 
>> wrote:
>> > >
>> > >> Hey Folks,
>> > >>
>> > >> Thanks for the discussion!
>> > >>
>> > >> *Motiviation and use cases*
>> > >> I think motiviation and use cases are very clear and I do not have
>> > doubts
>> > >> on this part.
>> > >> A typical use case is ETL with two-phase-commit, hundreds of
>> partitions
>> > can
>> > >> be blocked by a single straggler (a single task's checkpoint abortion
>> > can
>> > >> affect all, not necessary failure).
>> > >>
>> > >> *Source offset redistribution*
>> > >> As for the known sources & implementation for Flink, I can not find a
>> > case
>> > >> that does not work, *for now*.
>> > >> I need to dig a bit more: how splits are tracked assigned, not
>> > successfully
>> > >> processed, succesffully processed e.t.c.
>> > >> I guess it is a single shared source OPCoordinator. And how this
>> > *shared*
>> > >> state (between tasks) is preserved?
>> > >>
>> > >> *Input partition/splits treated completely independent from each
>> other*
>> > >> This part I am still not sure, as mentioned if we have shared state
>> of
>> > >> source in the above section.
>> > >>
>> > >> To Thomas:
>> > >>> In Yuan's example, is there a reason why CP8 could not be promoted
>> to
>> > >>> CP10 by the coordinator for PR2 once it receives the notification
>> that
>> > >>> CP10 did not complete? It appears that should be possible since in
>> its
>> > >>> effect it should be no different than no data processed between CP8
>> > >>>   and CP10?
>> > >> Not sure what "promoted" means here, but
>> > >> 1. I guess it does not matter whether it is CP8 or CP10 any more,
>> > >> if no shared state in source, as exactly what you meantinoed,
>> > >> "it should be no different than 

[jira] [Created] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-08 Thread shizhengchao (Jira)
shizhengchao created FLINK-26033:


 Summary: In KafkaConnector, when 'sink.partitioner' is configured 
as 'round-robin', it does not take effect
 Key: FLINK-26033
 URL: https://issues.apache.org/jira/browse/FLINK-26033
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.3, 1.13.3
Reporter: shizhengchao


In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
does not take effect. Flink treats 'default' and 'round-robin' as the same 
strategy.
{code:java}
//代码占位符
public static Optional> getFlinkKafkaPartitioner(
ReadableConfig tableOptions, ClassLoader classLoader) {
return tableOptions
.getOptional(SINK_PARTITIONER)
.flatMap(
(String partitioner) -> {
switch (partitioner) {
case SINK_PARTITIONER_VALUE_FIXED:
return Optional.of(new 
FlinkFixedPartitioner<>());
case SINK_PARTITIONER_VALUE_DEFAULT:
case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
return Optional.empty();
// Default fallback to full class name of the 
partitioner.
default:
return Optional.of(
initializePartitioner(partitioner, 
classLoader));
}
});
} {code}
They both use kafka's default partitioner, but the actual There are two 
scenarios for the partition on DefaultPartitioner:
1. Random when there is no key
2. When there is a key, take the modulo according to the key
{code:java}
// org.apache.kafka.clients.producer.internals.DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, 
byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
// Random when there is no key        
return stickyPartitionCache.partition(topic, cluster);
} 
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
} {code}
Therefore, KafkaConnector does not have a round-robin strategy.But we can 
borrow from kafka's RoundRobinPartitioner
{code:java}
//代码占位符
public class RoundRobinPartitioner implements Partitioner {
private final ConcurrentMap topicCounterMap = new 
ConcurrentHashMap<>();

public void configure(Map configs) {}

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}

private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}

public void close() {}

} {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26032) log job info in the ContextEnvironment

2022-02-08 Thread Jing Ge (Jira)
Jing Ge created FLINK-26032:
---

 Summary: log job info in the ContextEnvironment
 Key: FLINK-26032
 URL: https://issues.apache.org/jira/browse/FLINK-26032
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission
Reporter: Jing Ge
Assignee: Jing Ge


In Flink codebase, the {{org.apache.flink.client.program.ContextEnvironment}} 
has a static 
[Logger|https://github.com/apache/flink/blob/7f9587c723057e2b6cbaf748181c8c80a7f6703d/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java#L47]
 defined in the class, however, it doesn’t use it to print any logs. Instead, 
it prints logs with {{System.out}} and passes the Logger to 
{{ShutdownHookUtil.addShutdownHook}} and 
{{jobExecutionResultFuture.whenComplete}} for logging any hook errors.  If 
customer integrated the CLI (‘FlinkYarnSessionCli’ in their case) into a 
multi-threaded program to submit jobs in parallel, does it lead to any logs 
missing/override/disorder problems? 

It is always helpful to log the status information during the job submit 
process.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26031) Support projection pushdown on keys and values in sst file readers

2022-02-08 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-26031:
---

 Summary: Support projection pushdown on keys and values in sst 
file readers
 Key: FLINK-26031
 URL: https://issues.apache.org/jira/browse/FLINK-26031
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Reporter: Caizhi Weng
 Fix For: 1.15.0


Projection pushdown is an optimization for sources. With this optimization, we 
can avoid reading useless columns and thus improve performance.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26030) Set FLINK_LIB_DIR to lib under working dir in YARN containers

2022-02-08 Thread Biao Geng (Jira)
Biao Geng created FLINK-26030:
-

 Summary: Set FLINK_LIB_DIR to lib under working dir in YARN 
containers
 Key: FLINK-26030
 URL: https://issues.apache.org/jira/browse/FLINK-26030
 Project: Flink
  Issue Type: Improvement
Reporter: Biao Geng






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26029) Generalize the checkpoint protocol of OperatorCoordinator.

2022-02-08 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-26029:


 Summary: Generalize the checkpoint protocol of OperatorCoordinator.
 Key: FLINK-26029
 URL: https://issues.apache.org/jira/browse/FLINK-26029
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.14.3
Reporter: Jiangjie Qin


Currently the JM opens all the event valves from the OperatorCoordinator to the 
subtasks after the checkpoint barriers are sent to the Source subtasks. While 
this works for the Source Operators, it unnecessarily limits general usage of 
the OperatorCoordinator for other operators.

To generalize the protocol, we can change the JM to open the event valve of the 
subtasks that have finished the local checkpoint. So the protocol would become 
following:
 # Let the OC finish processing all the incoming OperatorEvents before the 
snapshot.
 # Wait until all the outgoing OperatorEvents before the snapshot are sent and 
acked.
 # Shut the event valve so no outgoing events can be sent to the subtasks.
 # Send checkpoint barriers to the Source operators.
 # Open the corresponding event valve of a subtask when the 
AcknowledgeCheckpoint messages from that subtask is received. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[DISCUSS]Support the merge statement in FlinkSQL

2022-02-08 Thread cao zou
Hi, devs!
Jingfeng and I would like to start a discussion about the MERGE statement, and 
the discussion consists of two parts. In the first part, we want to explore and 
collect the cases and motivations of the MERGE statement users. In the second 
part, we want to find out the possibility for Flink SQL to support the merge 
statement.

Before driving the first topic, we want to introduce the definition and 
benefits of the merge statement. The MERGE statement in SQL is a very popular 
clause and it can handle inserts, updates, and deletes all in a single 
transaction without having to write separate logic for each of these. 
For each insert, update, or delete statement, we can specify conditions 
separately. Now, many Engine/DBs have supported this feature, for example, SQL 
Server[1], Spark[2], Hive[3],  pgSQL[4]. 

Our use case: 
Order analysis & processing is one the most important scenario, but sometimes 
updated orders have a long time span compared with the last one with the same 
primary key, in the meanwhile, the states for this key have expired, such that 
the wrong Agg result will be achieved. In this situation, we use the merge 
statement in a batch job to correct the results, and now spark + iceberg is 
chosen in our internal. In the future, we want to unify the batch & streaming 
by using FlinkSQL in our internal, it would be better if Flink could support 
the merge statement. If you have other use cases and opinions, plz show us here.

Now, calcite does not have good support for the merge statement, and there 
exists a Jira CALCITE-4338[5] to track. Could we support the merge statement 
relying on the limited support from calcite-1.26.0? I wrote a simple doc[6] to 
drive this, just want to find out the possibility for Flink SQL to support the 
merge statement.

Looking forward to your feedback, thanks. 

best,
zoucao


[1]https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN=sql-server-ver15
 

[2]https://issues.apache.org/jira/browse/SPARK-28893 

[3]https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
 

[4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html 

[5]https://issues.apache.org/jira/browse/CALCITE-4338 

[6]https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing
 


[jira] [Created] (FLINK-26028) Write documentation for new PulsarSink

2022-02-08 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-26028:
---

 Summary: Write documentation for new PulsarSink
 Key: FLINK-26028
 URL: https://issues.apache.org/jira/browse/FLINK-26028
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Reporter: Yufan Sheng






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26026) Test FLIP-191 PulsarSink

2022-02-08 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-26026:
---

 Summary: Test FLIP-191 PulsarSink
 Key: FLINK-26026
 URL: https://issues.apache.org/jira/browse/FLINK-26026
 Project: Flink
  Issue Type: Sub-task
Reporter: Yufan Sheng






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26027) Add FLIP-33 metrics to new PulsarSink

2022-02-08 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-26027:
---

 Summary: Add FLIP-33 metrics to new PulsarSink
 Key: FLINK-26027
 URL: https://issues.apache.org/jira/browse/FLINK-26027
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Reporter: Yufan Sheng






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26025) Replace MockPulsar with new Pulsar test tools based on PulsarStandalone

2022-02-08 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-26025:
---

 Summary: Replace MockPulsar with new Pulsar test tools based on 
PulsarStandalone
 Key: FLINK-26025
 URL: https://issues.apache.org/jira/browse/FLINK-26025
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Reporter: Yufan Sheng


The old Pulsar connector tests are based on a mocked Pulsar broker which is 
kinda wired in some behavior. The transaction isn't supported in this mocked 
Pulsar. So we have to use PulsarStandalone directly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26024) Create a PulsarSerializationSchema for better records serialization

2022-02-08 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-26024:
---

 Summary: Create a PulsarSerializationSchema for better records 
serialization
 Key: FLINK-26024
 URL: https://issues.apache.org/jira/browse/FLINK-26024
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Reporter: Yufan Sheng






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26023) Create a Pulsar sink config model for matching ProducerConfigurationData

2022-02-08 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-26023:
---

 Summary: Create a Pulsar sink config model for matching 
ProducerConfigurationData
 Key: FLINK-26023
 URL: https://issues.apache.org/jira/browse/FLINK-26023
 Project: Flink
  Issue Type: Sub-task
Reporter: Yufan Sheng






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26022) Implement at-least-once and exactly-once Pulsar Sink

2022-02-08 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-26022:
---

 Summary: Implement at-least-once and exactly-once Pulsar Sink
 Key: FLINK-26022
 URL: https://issues.apache.org/jira/browse/FLINK-26022
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Reporter: Yufan Sheng


Support both three types of DeliveryGuarantee in Pulsar sink.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26021) Pulsar topic deduplicated in both sink and source connector

2022-02-08 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-26021:
---

 Summary: Pulsar topic deduplicated in both sink and source 
connector
 Key: FLINK-26021
 URL: https://issues.apache.org/jira/browse/FLINK-26021
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Reporter: Yufan Sheng
 Fix For: 1.16.0


Both topics and partitions are regarded as topics in Pulsar. We have to make 
the topic configuration more robust for deduplicating the partitions and topics.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26020) Unified Pulsar Connector config model

2022-02-08 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-26020:
---

 Summary: Unified Pulsar Connector config model
 Key: FLINK-26020
 URL: https://issues.apache.org/jira/browse/FLINK-26020
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Pulsar
Reporter: Yufan Sheng
 Fix For: 1.16.0


PulsarClient has built-in config model named ClientConfigurationData, 
ConsumerConfigurationData and ProducerConfigurationData. We don't want to 
expose all the config options. And some config options could conflict with each 
other.

We decide to design a new config model based on Flink's Configuration. Which 
could provide type checks and better integration with Flink Table.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26019) Changelogged PriorityQueue elements recovered out-of-order

2022-02-08 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-26019:
-

 Summary: Changelogged PriorityQueue elements recovered out-of-order
 Key: FLINK-26019
 URL: https://issues.apache.org/jira/browse/FLINK-26019
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.15.0


StateChangeFormat is the class responsible for writing out changelog data.
Each chunk of data is sorted by: logId -> sequenceNumber -> keyGroup.
Sorting by sequenceNumber preserves temporal order.
Sorting by keyGroup a) puts metadata (group -1) at the beginning and b) allows 
to write KG only once.

However, the assumption that the order of changes across groups currently 
doesn't hold: poll operation of InternalPriorityQueue may affect any group (the 
smaller item across groups so far will be polled).

This results in wrong processing time timers being removed on recovery in 
ProcessingTimeWindowCheckpointingITCase#testAggregatingSlidingProcessingTimeWindow

One way to solve this probelm is to simply disable KG-sorting and grouping 
(only output metadata at the beginning). 
The other one is to associate polled element with the correct key group while 
logging changes.

Both ways should work with re-scaling.

cc: [~masteryhx], [~ym], [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: Statefun async http request via RequestReplyFunctionBuilder

2022-02-08 Thread Galen Warren
I'm ready to pick this one up, I have some code that's working locally.
Shall I create a PR?



On Wed, Feb 2, 2022 at 3:17 PM Igal Shilman  wrote:

> Great, ping me when you would like to pick this up.
>
> For the related issue, I think that can be a good addition indeed!
>
> On Wed, Feb 2, 2022 at 8:55 PM Galen Warren 
> wrote:
>
> > Gotcha, thanks. I may be able to work on that one in a couple weeks if
> > you're looking for help.
> >
> > Unrelated question -- another thing that would be useful for me would be
> > the ability to set a maximum backoff interval in
> BoundedExponentialBackoff
> > or the async equivalent. My situation is this. I'd like to set a long
> > maxRequestDuration, so it takes a good while for Statefun to "give up"
> on a
> > function call, i.e. perhaps several hours or even days. During that time,
> > if the backoff interval doubles on each failure, those backoff intervals
> > get pretty long.
> >
> > Sometimes, in exponential backoff implementations, I've seen the concept
> of
> > a max backoff interval, i.e. once the backoff interval reaches that
> point,
> > it won't go any higher. So I could set it to, say, 60 seconds, and no
> > matter how long it would retry the function, the interval between retries
> > wouldn't be more than that.
> >
> > Do you think that would be a useful addition? I could post something to
> the
> > dev list if you want.
> >
> > On Wed, Feb 2, 2022 at 2:39 PM Igal Shilman  wrote:
> >
> > > Hi Galen,
> > > You are right, it is not possible, but there is no real reason for
> that.
> > > We should fix this, and I've created the following JIRA issue [1]
> > >
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-25933
> > >
> > > On Wed, Feb 2, 2022 at 6:30 PM Galen Warren 
> > > wrote:
> > >
> > > > Is it possible to choose the async HTTP transport using
> > > > RequestReplyFunctionBuilder? It looks to me that it is not, but I
> > wanted
> > > to
> > > > double check. Thanks.
> > > >
> > >
> >
>


[jira] [Created] (FLINK-26018) Late events in the new KafkaSource

2022-02-08 Thread Jun Qin (Jira)
Jun Qin created FLINK-26018:
---

 Summary: Late events in the new KafkaSource
 Key: FLINK-26018
 URL: https://issues.apache.org/jira/browse/FLINK-26018
 Project: Flink
  Issue Type: Bug
Reporter: Jun Qin
 Attachments: message in kafka.txt, 
taskmanager_10.28.0.131_33249-b3370c_log

There is an issue with the new KafkaSource connector in Flink 1.14: when one 
task consumes messages from multiple topic partitions (statically created, 
timestamp are in order), it may start with one partition and advances 
watermarks before the data from other partitions come. In this case, the early 
messages in other partitions may unnecessarily be considered  as late ones.

I discussed with [~renqs], it seems that the new KafkaSource only adds a 
partition into {{WatermarkMultiplexer}} when it receives data from that 
partition. In contrast, FlinkKafkaConsumer adds all known partition before it 
fetch any data. 

Attached two files: the messages in Kafka and the corresponding TM logs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26017) Add debug log message when marking a job result as dirty

2022-02-08 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-26017:
-

 Summary: Add debug log message when marking a job result as dirty
 Key: FLINK-26017
 URL: https://issues.apache.org/jira/browse/FLINK-26017
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Matthias Pohl






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-212: Introduce Flink Kubernetes Operator

2022-02-08 Thread Gyula Fóra
I agree with flink-kubernetes-operator as the repo name :)
Don't have any better idea

Gyula

On Sat, Feb 5, 2022 at 2:41 AM Thomas Weise  wrote:

> Hi,
>
> Thanks for the continued feedback and discussion. Looks like we are
> ready to start a VOTE, I will initiate it shortly.
>
> In parallel it would be good to find the repository name.
>
> My suggestion would be: flink-kubernetes-operator
>
> I thought "flink-operator" could be a bit misleading since the term
> operator already has a meaning in Flink.
>
> I also considered "flink-k8s-operator" but that would be almost
> identical to existing operator implementations and could lead to
> confusion in the future.
>
> Thoughts?
>
> Thanks,
> Thomas
>
>
>
> On Fri, Feb 4, 2022 at 5:15 AM Gyula Fóra  wrote:
> >
> > Hi Danny,
> >
> > So far we have been focusing our dev efforts on the initial native
> > implementation with the team.
> > If the discussion and vote goes well for this FLIP we are looking forward
> > to contributing the initial version sometime next week (fingers crossed).
> >
> > At that point I think we can already start the dev work to support the
> > standalone mode as well, especially if you can dedicate some effort to
> > pushing that side.
> > Working together on this sounds like a great idea and we should start as
> > soon as possible! :)
> >
> > Cheers,
> > Gyula
> >
> > On Fri, Feb 4, 2022 at 2:07 PM Danny Cranmer 
> > wrote:
> >
> > > I have been discussing this one with my team. We are interested in the
> > > Standalone mode, and are willing to contribute towards the
> implementation.
> > > Potentially we can work together to support both modes in parallel?
> > >
> > > Thanks,
> > >
> > > On Wed, Feb 2, 2022 at 4:02 PM Gyula Fóra 
> wrote:
> > >
> > > > Hi Danny!
> > > >
> > > > Thanks for the feedback :)
> > > >
> > > > Versioning:
> > > > Versioning will be independent from Flink and the operator will
> depend
> > > on a
> > > > fixed flink version (in every given operator version).
> > > > This should be the exact same setup as with Stateful Functions (
> > > > https://github.com/apache/flink-statefun). So independent release
> cycle
> > > > but
> > > > still within the Flink umbrella.
> > > >
> > > > Deployment error handling:
> > > > I think that's a very good point, as general exception handling for
> the
> > > > different failure scenarios is a tricky problem. I think the
> exception
> > > > classifiers and retry strategies could avoid a lot of manual
> intervention
> > > > from the user. We will definitely need to add something like this.
> Once
> > > we
> > > > have the repo created with the initial operator code we should open
> some
> > > > tickets for this and put it on the short term roadmap!
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > On Wed, Feb 2, 2022 at 4:50 PM Danny Cranmer <
> dannycran...@apache.org>
> > > > wrote:
> > > >
> > > > > Hey team,
> > > > >
> > > > > Great work on the FLIP, I am looking forward to this one. I agree
> that
> > > we
> > > > > can move forward to the voting stage.
> > > > >
> > > > > I have general feedback around how we will handle job submission
> > > failure
> > > > > and retry. As discussed in the Rejected Alternatives section, we
> can
> > > use
> > > > > Java to handle job submission failures from the Flink client. It
> would
> > > be
> > > > > useful to have the ability to configure exception classifiers and
> retry
> > > > > strategy as part of operator configuration.
> > > > >
> > > > > Given this will be in a separate Github repository I am curious how
> > > ther
> > > > > versioning strategy will work in relation to the Flink version? Do
> we
> > > > have
> > > > > any other components with a similar setup I can look at? Will the
> > > > operator
> > > > > version track Flink or will it use its own versioning strategy
> with a
> > > > Flink
> > > > > version support matrix, or similar?
> > > > >
> > > > > Thanks,
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Feb 1, 2022 at 2:33 PM Márton Balassi <
> > > balassi.mar...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi team,
> > > > > >
> > > > > > Thank you for the great feedback, Thomas has updated the FLIP
> page
> > > > > > accordingly. If you are comfortable with the currently existing
> > > design
> > > > > and
> > > > > > depth in the FLIP [1] I suggest moving forward to the voting
> stage -
> > > > once
> > > > > > that reaches a positive conclusion it lets us create the separate
> > > code
> > > > > > repository under the flink project for the operator.
> > > > > >
> > > > > > I encourage everyone to keep improving the details in the
> meantime,
> > > > > however
> > > > > > I believe given the existing design and the general sentiment on
> this
> > > > > > thread that the most efficient path from here is starting the
> > > > > > implementation so that we can collectively iterate over it.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> 

[jira] [Created] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage

2022-02-08 Thread jinfeng (Jira)
jinfeng created FLINK-26016:
---

 Summary: FileSystemLookupFunction does not produce correct results 
when hive table uses columnar storage
 Key: FLINK-26016
 URL: https://issues.apache.org/jira/browse/FLINK-26016
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.14.3
Reporter: jinfeng


When I use the parquet hive table as the lookup table, there will be some 
records that cannot be joined. This can be reproduced by adding unit tests to 
HiveLookupJoinITCase.

{code:java}
  // create the hive table with columnar storage.
tableEnv.executeSql(
String.format(
"create table columnar_table (x string) STORED AS 
PARQUET "
+ "tblproperties ('%s'='5min')",
HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));

@Test
public void testLookupJoinTableWithColumnarStorage() throws Exception {
// constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch 
is 2048, we should
// write as least 2048 records to the test table.
List testData = new ArrayList<>(4096);
for (int i = 0; i < 4096; i++) {
testData.add(Row.of(String.valueOf(i)));
}

// constructs test data using values table
TableEnvironment batchEnv = 
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
batchEnv.useCatalog(hiveCatalog.getName());
String dataId = TestValuesTableFactory.registerData(testData);
batchEnv.executeSql(
String.format(
"create table value_source(x string, p as proctime()) 
with ("
+ "'connector' = 'values', 'data-id' = '%s', 
'bounded'='true')",
dataId));
batchEnv.executeSql("insert overwrite columnar_table select x from 
value_source").await();
TableImpl flinkTable =
(TableImpl)
tableEnv.sqlQuery(
"select t.x as x1, c.x as x2 from value_source 
t "
+ "left join columnar_table for 
system_time as of t.p c "
+ "on t.x = c.x where c.x is null");
List results = 
CollectionUtil.iteratorToList(flinkTable.execute().collect());
assertTrue(results.size() == 0);
}
{code}

The problem may be caused by the following code. 

{code:java}
RowData row;
while ((row = partitionReader.read(reuse)) != null) {
   count++;
   RowData key = extractLookupKey(row);
   List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
   rows.add(serializer.copy(row));
}
{code}

 
It can be fixed with the following modification
{code:java}
RowData row;
while ((row = partitionReader.read(reuse)) != null) {
count++;
RowData rowData = serializer.copy(row);
RowData key = extractLookupKey(rowData);
List rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
rows.add(rowData);
}
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26015) FileSystemJobResultStore fails to access Minio

2022-02-08 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-26015:
-

 Summary: FileSystemJobResultStore fails to access Minio
 Key: FLINK-26015
 URL: https://issues.apache.org/jira/browse/FLINK-26015
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Matthias Pohl


We're experiencing issues with accessing Minio-backed filesystems (probably s3 
object stores in general). The base directory appears to be not created.
{code:java}
2022-02-08 13:13:31,682 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error 
occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of 
globally-terminated jobs from JobResultStore
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 ~[?:1.8.0_322]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 [?:1.8.0_322]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
 [?:1.8.0_322]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_322]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_322]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve 
JobResults of globally-terminated jobs from JobResultStore
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:186)
 ~[flink-dist-flink-nightly.jar:flink-nightly]
at 
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
 ~[flink-dist-flink-nightly.jar:flink-nightly]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:178)
 ~[flink-dist-flink-nightly.jar:flink-nightly]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_322]
... 3 more
Caused by: java.io.FileNotFoundException: No such file or directory: 
s3://vvc-eu-west-1-dev-store/myorg/myscope/3aa35e65-df86-4b16-8cc7-7c75af879317-test-job-name-a/ha/job-result-store/default
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2344) 
~[?:?]
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2226)
 ~[?:?]
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2160) 
~[?:?]
at 
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
 ~[?:?]
at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:105)
 ~[flink-dist-flink-nightly.jar:flink-nightly]
at 
org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:158)
 ~[flink-dist-flink-nightly.jar:flink-nightly]
at 
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
 ~[flink-dist-flink-nightly.jar:flink-nightly]
at 
org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
 ~[flink-dist-flink-nightly.jar:flink-nightly]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:184)
 ~[flink-dist-flink-nightly.jar:flink-nightly]
at 
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
 ~[flink-dist-flink-nightly.jar:flink-nightly]
at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:178)
 ~[flink-dist-flink-nightly.jar:flink-nightly]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_322]
... 3 more {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26014) Document how to use the working directory for faster local recoveries

2022-02-08 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-26014:
-

 Summary: Document how to use the working directory for faster 
local recoveries
 Key: FLINK-26014
 URL: https://issues.apache.org/jira/browse/FLINK-26014
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.15.0


After having implemented FLIP-198 and FLIP-201, users can now use faster 
TaskManager failover when using local recovery with persisted volumes. I 
suggest to add documentation for explaining how to configure Flink to make use 
of it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26013) Develop ArchUnit test for Flink core and runtime

2022-02-08 Thread Jing Ge (Jira)
Jing Ge created FLINK-26013:
---

 Summary: Develop ArchUnit test for Flink core and runtime
 Key: FLINK-26013
 URL: https://issues.apache.org/jira/browse/FLINK-26013
 Project: Flink
  Issue Type: Improvement
  Components: Test Infrastructure
Reporter: Jing Ge
Assignee: Jing Ge






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26012) Develop ArchUnit test for Flink table

2022-02-08 Thread Jing Ge (Jira)
Jing Ge created FLINK-26012:
---

 Summary: Develop ArchUnit test for Flink table
 Key: FLINK-26012
 URL: https://issues.apache.org/jira/browse/FLINK-26012
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: Jing Ge
Assignee: Jing Ge






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26011) Develop ArchUnit test for formats

2022-02-08 Thread Jing Ge (Jira)
Jing Ge created FLINK-26011:
---

 Summary: Develop ArchUnit test for formats
 Key: FLINK-26011
 URL: https://issues.apache.org/jira/browse/FLINK-26011
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Jing Ge
Assignee: Jing Ge






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26010) Develop ArchUnit test for filesystems

2022-02-08 Thread Jing Ge (Jira)
Jing Ge created FLINK-26010:
---

 Summary: Develop ArchUnit test for filesystems
 Key: FLINK-26010
 URL: https://issues.apache.org/jira/browse/FLINK-26010
 Project: Flink
  Issue Type: Improvement
  Components: FileSystems
Reporter: Jing Ge
Assignee: Jing Ge






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26009) ArchUnit for Flink

2022-02-08 Thread Jing Ge (Jira)
Jing Ge created FLINK-26009:
---

 Summary: ArchUnit for Flink
 Key: FLINK-26009
 URL: https://issues.apache.org/jira/browse/FLINK-26009
 Project: Flink
  Issue Type: Improvement
Reporter: Jing Ge


This is the umbrella ticket for all ArchUnit related tasks.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26008) [FLIP-171] Update Kinesalite docker container reference

2022-02-08 Thread Danny Cranmer (Jira)
Danny Cranmer created FLINK-26008:
-

 Summary: [FLIP-171] Update Kinesalite docker container reference 
 Key: FLINK-26008
 URL: https://issues.apache.org/jira/browse/FLINK-26008
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kinesis
Reporter: Danny Cranmer


We are currently referencing {{:latest}} Kinesalite image for tests. Update 
this to the more recent version:
- https://github.com/apache/flink/pull/18661/files#r801531468



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-08 Thread 丛鹏
hi guys,If I understand it correctly, will only some checkpoints be
recovered when there is an error in the Flink batch?

Piotr Nowojski  于2022年2月8日周二 19:05写道:

> Hi,
>
> I second Chesnay's comment and would like to better understand the
> motivation behind this. At the surface it sounds to me like this might
> require quite a bit of work for a very narrow use case.
>
> At the same time I have a feeling that Yuan, you are mixing this feature
> request (checkpointing subgraphs/pipeline regions independently) and a very
> very different issue of "task local checkpoints"? Those problems are kind
> of similar, but not quite.
>
> Best,
> Piotrek
>
> wt., 8 lut 2022 o 11:44 Chesnay Schepler  napisał(a):
>
> > Could someone expand on these operational issues you're facing when
> > achieving this via separate jobs?
> >
> > I feel like we're skipping a step, arguing about solutions without even
> > having discussed the underlying problem.
> >
> > On 08/02/2022 11:25, Gen Luo wrote:
> > > Hi,
> > >
> > > @Yuan
> > > Do you mean that there should be no shared state between source
> subtasks?
> > > Sharing state between checkpoints of a specific subtask should be fine.
> > >
> > > Sharing state between subtasks of a task can be an issue, no matter
> > whether
> > > it's a source. That's also what I was afraid of in the previous
> replies.
> > In
> > > one word, if the behavior of a pipeline region can somehow influence
> the
> > > state of other pipeline regions, their checkpoints have to be aligned
> > > before rescaling.
> > >
> > > On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei 
> wrote:
> > >
> > >> Hey Folks,
> > >>
> > >> Thanks for the discussion!
> > >>
> > >> *Motiviation and use cases*
> > >> I think motiviation and use cases are very clear and I do not have
> > doubts
> > >> on this part.
> > >> A typical use case is ETL with two-phase-commit, hundreds of
> partitions
> > can
> > >> be blocked by a single straggler (a single task's checkpoint abortion
> > can
> > >> affect all, not necessary failure).
> > >>
> > >> *Source offset redistribution*
> > >> As for the known sources & implementation for Flink, I can not find a
> > case
> > >> that does not work, *for now*.
> > >> I need to dig a bit more: how splits are tracked assigned, not
> > successfully
> > >> processed, succesffully processed e.t.c.
> > >> I guess it is a single shared source OPCoordinator. And how this
> > *shared*
> > >> state (between tasks) is preserved?
> > >>
> > >> *Input partition/splits treated completely independent from each
> other*
> > >> This part I am still not sure, as mentioned if we have shared state of
> > >> source in the above section.
> > >>
> > >> To Thomas:
> > >>> In Yuan's example, is there a reason why CP8 could not be promoted to
> > >>> CP10 by the coordinator for PR2 once it receives the notification
> that
> > >>> CP10 did not complete? It appears that should be possible since in
> its
> > >>> effect it should be no different than no data processed between CP8
> > >>>   and CP10?
> > >> Not sure what "promoted" means here, but
> > >> 1. I guess it does not matter whether it is CP8 or CP10 any more,
> > >> if no shared state in source, as exactly what you meantinoed,
> > >> "it should be no different than no data processed between CP8 and
> CP10"
> > >>
> > >> 2. I've noticed that from this question there is a gap between
> > >> "*allow aborted/failed checkpoint in independent sub-graph*" and
> > >> my intention: "*independent sub-graph checkpointing indepently*"
> > >>
> > >> Best
> > >> Yuan
> > >>
> > >>
> > >> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo  wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> I'm thinking about Yuan's case. Let's assume that the case is running
> > in
> > >>> current Flink:
> > >>> 1. CP8 finishes
> > >>> 2. For some reason, PR2 stops consuming records from the source (but
> is
> > >> not
> > >>> stuck), and PR1 continues consuming new records.
> > >>> 3. CP9 and CP10 finish
> > >>> 4. PR2 starts to consume quickly to catch up with PR1, and reaches
> the
> > >> same
> > >>> final status with that in Yuan's case before CP11 starts.
> > >>>
> > >>> I support that in this case, the status of the job can be the same as
> > in
> > >>> Yuan's case, and the snapshot (including source states) taken at CP10
> > >>> should be the same as the composed global snapshot in Yuan's case,
> > which
> > >> is
> > >>> combining CP10 of PR1 and CP8 of PR2. This should be true if neither
> > >> failed
> > >>> checkpointing nor uncommitted consuming have side effects, both of
> > which
> > >>> can break the exactly-once semantics when replaying. So I think there
> > >>> should be no difference between rescaling the combined global
> snapshot
> > or
> > >>> the globally taken one, i.e. if the input partitions are not
> > independent,
> > >>> we are probably not able to rescale the source state in the current
> > Flink
> > >>> eiter.
> > >>>
> > >>> And @Thomas, I do agree that the operational burden is
> > >>> 

[jira] [Created] (FLINK-26007) Use durationType instead of stringType for time related config options

2022-02-08 Thread Marios Trivyzas (Jira)
Marios Trivyzas created FLINK-26007:
---

 Summary: Use durationType instead of stringType for time related 
config options
 Key: FLINK-26007
 URL: https://issues.apache.org/jira/browse/FLINK-26007
 Project: Flink
  Issue Type: Improvement
Reporter: Marios Trivyzas


Check and change all relevant config options so that we have a well 
typed/defined type instead of parsing a string.

*NOTE:* This is a breaking change!

 

e.g.: 
{noformat}
AkkaOptions#WATCH_HEARTBEAT_PAUSE

public static final ConfigOption WATCH_HEARTBEAT_PAUSE =
    ConfigOptions.key("akka.watch.heartbeat.pause")
            .defaultValue("60 s")
            .withDescription(
                    Description.builder()
                            .text(
                                    "Acceptable heartbeat pause for Akka’s 
DeathWatch mechanism. A low value does not allow an"
                                            + " irregular heartbeat. If 
TaskManagers are wrongly marked dead because of lost or delayed"
                                            + " heartbeat messages, then you 
should increase this value or decrease akka.watch.heartbeat.interval."
                                            + " Higher value increases the time 
to detect a dead TaskManager. A thorough description of Akka’s"
                                            + " DeathWatch can be found %s",
                                    link(
                                            
"http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector;,
                                            "here"))
                            .build());{noformat}
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java#L300

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26006) KinesisFirehoseSinkITCase leaks resources

2022-02-08 Thread Jira
David Morávek created FLINK-26006:
-

 Summary: KinesisFirehoseSinkITCase leaks resources
 Key: FLINK-26006
 URL: https://issues.apache.org/jira/browse/FLINK-26006
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Reporter: David Morávek
 Fix For: 1.15.0


Both KinesisFirehoseSinkITCase and KinesisFirehose sink in general don't close 
the underlying AWS resources.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-08 Thread Piotr Nowojski
Hi,

I second Chesnay's comment and would like to better understand the
motivation behind this. At the surface it sounds to me like this might
require quite a bit of work for a very narrow use case.

At the same time I have a feeling that Yuan, you are mixing this feature
request (checkpointing subgraphs/pipeline regions independently) and a very
very different issue of "task local checkpoints"? Those problems are kind
of similar, but not quite.

Best,
Piotrek

wt., 8 lut 2022 o 11:44 Chesnay Schepler  napisał(a):

> Could someone expand on these operational issues you're facing when
> achieving this via separate jobs?
>
> I feel like we're skipping a step, arguing about solutions without even
> having discussed the underlying problem.
>
> On 08/02/2022 11:25, Gen Luo wrote:
> > Hi,
> >
> > @Yuan
> > Do you mean that there should be no shared state between source subtasks?
> > Sharing state between checkpoints of a specific subtask should be fine.
> >
> > Sharing state between subtasks of a task can be an issue, no matter
> whether
> > it's a source. That's also what I was afraid of in the previous replies.
> In
> > one word, if the behavior of a pipeline region can somehow influence the
> > state of other pipeline regions, their checkpoints have to be aligned
> > before rescaling.
> >
> > On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei  wrote:
> >
> >> Hey Folks,
> >>
> >> Thanks for the discussion!
> >>
> >> *Motiviation and use cases*
> >> I think motiviation and use cases are very clear and I do not have
> doubts
> >> on this part.
> >> A typical use case is ETL with two-phase-commit, hundreds of partitions
> can
> >> be blocked by a single straggler (a single task's checkpoint abortion
> can
> >> affect all, not necessary failure).
> >>
> >> *Source offset redistribution*
> >> As for the known sources & implementation for Flink, I can not find a
> case
> >> that does not work, *for now*.
> >> I need to dig a bit more: how splits are tracked assigned, not
> successfully
> >> processed, succesffully processed e.t.c.
> >> I guess it is a single shared source OPCoordinator. And how this
> *shared*
> >> state (between tasks) is preserved?
> >>
> >> *Input partition/splits treated completely independent from each other*
> >> This part I am still not sure, as mentioned if we have shared state of
> >> source in the above section.
> >>
> >> To Thomas:
> >>> In Yuan's example, is there a reason why CP8 could not be promoted to
> >>> CP10 by the coordinator for PR2 once it receives the notification that
> >>> CP10 did not complete? It appears that should be possible since in its
> >>> effect it should be no different than no data processed between CP8
> >>>   and CP10?
> >> Not sure what "promoted" means here, but
> >> 1. I guess it does not matter whether it is CP8 or CP10 any more,
> >> if no shared state in source, as exactly what you meantinoed,
> >> "it should be no different than no data processed between CP8 and CP10"
> >>
> >> 2. I've noticed that from this question there is a gap between
> >> "*allow aborted/failed checkpoint in independent sub-graph*" and
> >> my intention: "*independent sub-graph checkpointing indepently*"
> >>
> >> Best
> >> Yuan
> >>
> >>
> >> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo  wrote:
> >>
> >>> Hi,
> >>>
> >>> I'm thinking about Yuan's case. Let's assume that the case is running
> in
> >>> current Flink:
> >>> 1. CP8 finishes
> >>> 2. For some reason, PR2 stops consuming records from the source (but is
> >> not
> >>> stuck), and PR1 continues consuming new records.
> >>> 3. CP9 and CP10 finish
> >>> 4. PR2 starts to consume quickly to catch up with PR1, and reaches the
> >> same
> >>> final status with that in Yuan's case before CP11 starts.
> >>>
> >>> I support that in this case, the status of the job can be the same as
> in
> >>> Yuan's case, and the snapshot (including source states) taken at CP10
> >>> should be the same as the composed global snapshot in Yuan's case,
> which
> >> is
> >>> combining CP10 of PR1 and CP8 of PR2. This should be true if neither
> >> failed
> >>> checkpointing nor uncommitted consuming have side effects, both of
> which
> >>> can break the exactly-once semantics when replaying. So I think there
> >>> should be no difference between rescaling the combined global snapshot
> or
> >>> the globally taken one, i.e. if the input partitions are not
> independent,
> >>> we are probably not able to rescale the source state in the current
> Flink
> >>> eiter.
> >>>
> >>> And @Thomas, I do agree that the operational burden is
> >>> significantly reduced, while I'm a little afraid that checkpointing the
> >>> subgraphs individually may increase most of the runtime overhead back
> >>> again. Maybe we can find a better way to implement this.
> >>>
> >>> On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise  wrote:
> >>>
>  Hi,
> 
>  Thanks for opening this discussion! The proposed enhancement would be
>  interesting for use cases in our infrastructure as 

Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-08 Thread Chesnay Schepler
Could someone expand on these operational issues you're facing when 
achieving this via separate jobs?


I feel like we're skipping a step, arguing about solutions without even 
having discussed the underlying problem.


On 08/02/2022 11:25, Gen Luo wrote:

Hi,

@Yuan
Do you mean that there should be no shared state between source subtasks?
Sharing state between checkpoints of a specific subtask should be fine.

Sharing state between subtasks of a task can be an issue, no matter whether
it's a source. That's also what I was afraid of in the previous replies. In
one word, if the behavior of a pipeline region can somehow influence the
state of other pipeline regions, their checkpoints have to be aligned
before rescaling.

On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei  wrote:


Hey Folks,

Thanks for the discussion!

*Motiviation and use cases*
I think motiviation and use cases are very clear and I do not have doubts
on this part.
A typical use case is ETL with two-phase-commit, hundreds of partitions can
be blocked by a single straggler (a single task's checkpoint abortion can
affect all, not necessary failure).

*Source offset redistribution*
As for the known sources & implementation for Flink, I can not find a case
that does not work, *for now*.
I need to dig a bit more: how splits are tracked assigned, not successfully
processed, succesffully processed e.t.c.
I guess it is a single shared source OPCoordinator. And how this *shared*
state (between tasks) is preserved?

*Input partition/splits treated completely independent from each other*
This part I am still not sure, as mentioned if we have shared state of
source in the above section.

To Thomas:

In Yuan's example, is there a reason why CP8 could not be promoted to
CP10 by the coordinator for PR2 once it receives the notification that
CP10 did not complete? It appears that should be possible since in its
effect it should be no different than no data processed between CP8
  and CP10?

Not sure what "promoted" means here, but
1. I guess it does not matter whether it is CP8 or CP10 any more,
if no shared state in source, as exactly what you meantinoed,
"it should be no different than no data processed between CP8 and CP10"

2. I've noticed that from this question there is a gap between
"*allow aborted/failed checkpoint in independent sub-graph*" and
my intention: "*independent sub-graph checkpointing indepently*"

Best
Yuan


On Tue, Feb 8, 2022 at 11:34 AM Gen Luo  wrote:


Hi,

I'm thinking about Yuan's case. Let's assume that the case is running in
current Flink:
1. CP8 finishes
2. For some reason, PR2 stops consuming records from the source (but is

not

stuck), and PR1 continues consuming new records.
3. CP9 and CP10 finish
4. PR2 starts to consume quickly to catch up with PR1, and reaches the

same

final status with that in Yuan's case before CP11 starts.

I support that in this case, the status of the job can be the same as in
Yuan's case, and the snapshot (including source states) taken at CP10
should be the same as the composed global snapshot in Yuan's case, which

is

combining CP10 of PR1 and CP8 of PR2. This should be true if neither

failed

checkpointing nor uncommitted consuming have side effects, both of which
can break the exactly-once semantics when replaying. So I think there
should be no difference between rescaling the combined global snapshot or
the globally taken one, i.e. if the input partitions are not independent,
we are probably not able to rescale the source state in the current Flink
eiter.

And @Thomas, I do agree that the operational burden is
significantly reduced, while I'm a little afraid that checkpointing the
subgraphs individually may increase most of the runtime overhead back
again. Maybe we can find a better way to implement this.

On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise  wrote:


Hi,

Thanks for opening this discussion! The proposed enhancement would be
interesting for use cases in our infrastructure as well.

There are scenarios where it makes sense to have multiple disconnected
subgraphs in a single job because it can significantly reduce the
operational burden as well as the runtime overhead. Since we allow
subgraphs to recover independently, then why not allow them to make
progress independently also, which would imply that checkpointing must
succeed for non affected subgraphs as certain behavior is tied to
checkpoint completion, like Kafka offset commit, file output etc.

As for source offset redistribution, offset/position needs to be tied
to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
source framework, it would be hard to implement a source with correct
behavior that does not track the position along with the split.

In Yuan's example, is there a reason why CP8 could not be promoted to
CP10 by the coordinator for PR2 once it receives the notification that
CP10 did not complete? It appears that should be possible since 

[jira] [Created] (FLINK-26005) TableEnvironment.createTemporarySystemFunction cause NPE when using leftOuterLateralJoin

2022-02-08 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-26005:
-

 Summary: TableEnvironment.createTemporarySystemFunction cause NPE 
when using leftOuterLateralJoin
 Key: FLINK-26005
 URL: https://issues.apache.org/jira/browse/FLINK-26005
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.14.3, 1.15.0
Reporter: Till Rohrmann


When trying out the {{Table.leftOuterLateralJoin}} with a table function that 
was registered via {{TableEnvironment.createTemporarySystemFunction}} the 
system failed with 

{code}
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.calcite.tools.RelBuilder$Frame.(RelBuilder.java:3332)
at 
org.apache.calcite.tools.RelBuilder$Frame.(RelBuilder.java:3317)
at org.apache.calcite.tools.RelBuilder.push(RelBuilder.java:282)
at 
org.apache.calcite.tools.RelBuilder.functionScan(RelBuilder.java:1197)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:309)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:154)
at 
org.apache.flink.table.operations.CalculatedQueryOperation.accept(CalculatedQueryOperation.java:94)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:151)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:87)
at 
org.apache.flink.table.operations.CalculatedQueryOperation.accept(CalculatedQueryOperation.java:94)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:150)
at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4390)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:150)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:62)
at 
org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:115)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:150)
at 
java.base/java.util.Collections$SingletonList.forEach(Collections.java:4854)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:150)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
at 
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:76)
at 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:184)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:214)
at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:182)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:805)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:601)
{code}

Interestingly, when using the deprecated {{TableEnvironment.registerFunction}} 
it worked. Timo mentioned that this could be caused by a missing integration 
into the new type 

Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-08 Thread Gen Luo
Hi,

@Yuan
Do you mean that there should be no shared state between source subtasks?
Sharing state between checkpoints of a specific subtask should be fine.

Sharing state between subtasks of a task can be an issue, no matter whether
it's a source. That's also what I was afraid of in the previous replies. In
one word, if the behavior of a pipeline region can somehow influence the
state of other pipeline regions, their checkpoints have to be aligned
before rescaling.

On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei  wrote:

> Hey Folks,
>
> Thanks for the discussion!
>
> *Motiviation and use cases*
> I think motiviation and use cases are very clear and I do not have doubts
> on this part.
> A typical use case is ETL with two-phase-commit, hundreds of partitions can
> be blocked by a single straggler (a single task's checkpoint abortion can
> affect all, not necessary failure).
>
> *Source offset redistribution*
> As for the known sources & implementation for Flink, I can not find a case
> that does not work, *for now*.
> I need to dig a bit more: how splits are tracked assigned, not successfully
> processed, succesffully processed e.t.c.
> I guess it is a single shared source OPCoordinator. And how this *shared*
> state (between tasks) is preserved?
>
> *Input partition/splits treated completely independent from each other*
> This part I am still not sure, as mentioned if we have shared state of
> source in the above section.
>
> To Thomas:
> > In Yuan's example, is there a reason why CP8 could not be promoted to
> > CP10 by the coordinator for PR2 once it receives the notification that
> > CP10 did not complete? It appears that should be possible since in its
> > effect it should be no different than no data processed between CP8
> >  and CP10?
>
> Not sure what "promoted" means here, but
> 1. I guess it does not matter whether it is CP8 or CP10 any more,
> if no shared state in source, as exactly what you meantinoed,
> "it should be no different than no data processed between CP8 and CP10"
>
> 2. I've noticed that from this question there is a gap between
> "*allow aborted/failed checkpoint in independent sub-graph*" and
> my intention: "*independent sub-graph checkpointing indepently*"
>
> Best
> Yuan
>
>
> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo  wrote:
>
> > Hi,
> >
> > I'm thinking about Yuan's case. Let's assume that the case is running in
> > current Flink:
> > 1. CP8 finishes
> > 2. For some reason, PR2 stops consuming records from the source (but is
> not
> > stuck), and PR1 continues consuming new records.
> > 3. CP9 and CP10 finish
> > 4. PR2 starts to consume quickly to catch up with PR1, and reaches the
> same
> > final status with that in Yuan's case before CP11 starts.
> >
> > I support that in this case, the status of the job can be the same as in
> > Yuan's case, and the snapshot (including source states) taken at CP10
> > should be the same as the composed global snapshot in Yuan's case, which
> is
> > combining CP10 of PR1 and CP8 of PR2. This should be true if neither
> failed
> > checkpointing nor uncommitted consuming have side effects, both of which
> > can break the exactly-once semantics when replaying. So I think there
> > should be no difference between rescaling the combined global snapshot or
> > the globally taken one, i.e. if the input partitions are not independent,
> > we are probably not able to rescale the source state in the current Flink
> > eiter.
> >
> > And @Thomas, I do agree that the operational burden is
> > significantly reduced, while I'm a little afraid that checkpointing the
> > subgraphs individually may increase most of the runtime overhead back
> > again. Maybe we can find a better way to implement this.
> >
> > On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise  wrote:
> >
> > > Hi,
> > >
> > > Thanks for opening this discussion! The proposed enhancement would be
> > > interesting for use cases in our infrastructure as well.
> > >
> > > There are scenarios where it makes sense to have multiple disconnected
> > > subgraphs in a single job because it can significantly reduce the
> > > operational burden as well as the runtime overhead. Since we allow
> > > subgraphs to recover independently, then why not allow them to make
> > > progress independently also, which would imply that checkpointing must
> > > succeed for non affected subgraphs as certain behavior is tied to
> > > checkpoint completion, like Kafka offset commit, file output etc.
> > >
> > > As for source offset redistribution, offset/position needs to be tied
> > > to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
> > > and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
> > > source framework, it would be hard to implement a source with correct
> > > behavior that does not track the position along with the split.
> > >
> > > In Yuan's example, is there a reason why CP8 could not be promoted to
> > > CP10 by the coordinator for PR2 once it receives the notification that
> > > 

[jira] [Created] (FLINK-26004) Introduce ForwardForLocalKeyByPartitioner

2022-02-08 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-26004:
--

 Summary: Introduce ForwardForLocalKeyByPartitioner
 Key: FLINK-26004
 URL: https://issues.apache.org/jira/browse/FLINK-26004
 Project: Flink
  Issue Type: Sub-task
Reporter: Lijie Wang


If there are multiple consecutive the same groupBy(i.e. keyBy), SQL planner 
will change them except the first one to use forward partitioner, so that these 
operators can be chained to reduce unnecessary shuffles.

However, sometimes the local keyBy operators are not chained (e.g. multiple 
inputs), and this kind of forward partitioners will turn into forward job 
edges. These forward edges still have the local keyBy assumption, so that they 
cannot be changed into rescale/rebalance edges, otherwise it can lead to 
incorrect results. This prevents the adaptive batch scheduler from determining 
parallelism for other forward edge downstream job vertices (see FLINK-25046).

To solve it, I propose to introduce a new 
{{{}ForwardForLocalKeyByPartitioner{}}}. When SQL planner optimizes the case of 
multiple consecutive the same groupBy, it should use the proposed partitioner, 
so that the runtime framework can further decide whether the partitioner can be 
changed to hash or not.
h4.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: 来自高大余的邮件

2022-02-08 Thread Caizhi Weng
Hi!

Thanks for your interest in contributing to Flink! Currently there is no
need (and actually there does not exist) a contributor permission. Feel
free to open or reply to a JIRA ticket you're interested in, or to discuss
in the mailing list with others. If you'd like to be assigned a JIRA
ticket, please reply to that ticket and a committer will assign that to you.

高大余 <18656037...@163.com> 于2022年2月8日周二 17:25写道:

> Hi Guys,
>
>
>
> I want to contribute to Apache Flink.
>
> Would you please give me the permission as a contributor?
>
> My JIRA ID is dygao.


Re: [DISCUSS] Checkpointing (partially) failing jobs

2022-02-08 Thread Yuan Mei
Hey Folks,

Thanks for the discussion!

*Motiviation and use cases*
I think motiviation and use cases are very clear and I do not have doubts
on this part.
A typical use case is ETL with two-phase-commit, hundreds of partitions can
be blocked by a single straggler (a single task's checkpoint abortion can
affect all, not necessary failure).

*Source offset redistribution*
As for the known sources & implementation for Flink, I can not find a case
that does not work, *for now*.
I need to dig a bit more: how splits are tracked assigned, not successfully
processed, succesffully processed e.t.c.
I guess it is a single shared source OPCoordinator. And how this *shared*
state (between tasks) is preserved?

*Input partition/splits treated completely independent from each other*
This part I am still not sure, as mentioned if we have shared state of
source in the above section.

To Thomas:
> In Yuan's example, is there a reason why CP8 could not be promoted to
> CP10 by the coordinator for PR2 once it receives the notification that
> CP10 did not complete? It appears that should be possible since in its
> effect it should be no different than no data processed between CP8
>  and CP10?

Not sure what "promoted" means here, but
1. I guess it does not matter whether it is CP8 or CP10 any more,
if no shared state in source, as exactly what you meantinoed,
"it should be no different than no data processed between CP8 and CP10"

2. I've noticed that from this question there is a gap between
"*allow aborted/failed checkpoint in independent sub-graph*" and
my intention: "*independent sub-graph checkpointing indepently*"

Best
Yuan


On Tue, Feb 8, 2022 at 11:34 AM Gen Luo  wrote:

> Hi,
>
> I'm thinking about Yuan's case. Let's assume that the case is running in
> current Flink:
> 1. CP8 finishes
> 2. For some reason, PR2 stops consuming records from the source (but is not
> stuck), and PR1 continues consuming new records.
> 3. CP9 and CP10 finish
> 4. PR2 starts to consume quickly to catch up with PR1, and reaches the same
> final status with that in Yuan's case before CP11 starts.
>
> I support that in this case, the status of the job can be the same as in
> Yuan's case, and the snapshot (including source states) taken at CP10
> should be the same as the composed global snapshot in Yuan's case, which is
> combining CP10 of PR1 and CP8 of PR2. This should be true if neither failed
> checkpointing nor uncommitted consuming have side effects, both of which
> can break the exactly-once semantics when replaying. So I think there
> should be no difference between rescaling the combined global snapshot or
> the globally taken one, i.e. if the input partitions are not independent,
> we are probably not able to rescale the source state in the current Flink
> eiter.
>
> And @Thomas, I do agree that the operational burden is
> significantly reduced, while I'm a little afraid that checkpointing the
> subgraphs individually may increase most of the runtime overhead back
> again. Maybe we can find a better way to implement this.
>
> On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise  wrote:
>
> > Hi,
> >
> > Thanks for opening this discussion! The proposed enhancement would be
> > interesting for use cases in our infrastructure as well.
> >
> > There are scenarios where it makes sense to have multiple disconnected
> > subgraphs in a single job because it can significantly reduce the
> > operational burden as well as the runtime overhead. Since we allow
> > subgraphs to recover independently, then why not allow them to make
> > progress independently also, which would imply that checkpointing must
> > succeed for non affected subgraphs as certain behavior is tied to
> > checkpoint completion, like Kafka offset commit, file output etc.
> >
> > As for source offset redistribution, offset/position needs to be tied
> > to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
> > and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
> > source framework, it would be hard to implement a source with correct
> > behavior that does not track the position along with the split.
> >
> > In Yuan's example, is there a reason why CP8 could not be promoted to
> > CP10 by the coordinator for PR2 once it receives the notification that
> > CP10 did not complete? It appears that should be possible since in its
> > effect it should be no different than no data processed between CP8
> > and CP10?
> >
> > Thanks,
> > Thomas
> >
> > On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann 
> wrote:
> > >
> > > Thanks for the clarification Yuan and Gen,
> > >
> > > I agree that the checkpointing of the sources needs to support the
> > > rescaling case, otherwise it does not work. Is there currently a source
> > > implementation where this wouldn't work? For Kafka it should work
> because
> > > we store the offset per assigned partition. For Kinesis it is probably
> > the
> > > same. For the Filesource we store the set of unread input splits in the
> > > 

来自高大余的邮件

2022-02-08 Thread 高大余
Hi Guys,

 

I want to contribute to Apache Flink.

Would you please give me the permission as a contributor?

My JIRA ID is dygao.

[jira] [Created] (FLINK-26003) Use Jackson serialization for persisting TaskExecutor state to working directory

2022-02-08 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-26003:
-

 Summary: Use Jackson serialization for persisting TaskExecutor 
state to working directory
 Key: FLINK-26003
 URL: https://issues.apache.org/jira/browse/FLINK-26003
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.15.0


In order to avoid Java serialization, we should use a different serialization 
format for persisting {{TaskExecutor}} state in the working directory. One idea 
could be to use Jackson for the serialization.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26002) Add test coverage for native format job upgrades

2022-02-08 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-26002:
-

 Summary: Add test coverage for native format job upgrades
 Key: FLINK-26002
 URL: https://issues.apache.org/jira/browse/FLINK-26002
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.15.0
Reporter: Anton Kalashnikov
Assignee: Anton Kalashnikov
 Fix For: 1.15.0


# Initialization of job with an operator with states. 
# Do native savepoint/canonical savepoint/aligned checkpoint 
## Change job shape(new custom operator) and/or record types
## One job, with all possible network exchanges (forward, keyBy, rebalance, 
broadcast, random, rescale), followed by stateful operator.
## No need to modify state schema. Just validate in some way that after upgrade 
state is assigned to correct operators (so the state should be affecting result 
of the operators/functions)
## Some record type that would allow us to validate consistency of the 
computations
## Validating sink, checking for the consistency
## Arbitrary job upgrade. Add two new operators. A chained mapping operator and 
a another keyed proceeded by keyed exchange . Change record type from int to 
string, without changing the “logical” value of the record (for example change 
1 → "1")
## Job upgrade w/o changing the graph record type: no need to test for that
# Restore from savepoint/checkpoint



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26001) Implement ProjectableDecodingFormat for avro bulk format

2022-02-08 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-26001:
---

 Summary: Implement ProjectableDecodingFormat for avro bulk format
 Key: FLINK-26001
 URL: https://issues.apache.org/jira/browse/FLINK-26001
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Caizhi Weng
 Fix For: 1.15.0


Currently {{BulkDecodingFormat}} of avro does not also implement 
{{ProjectableDecodingFormat}} and the reader will have to read unused columns. 
We need to implement {{ProjectableDecodingFormat}} to optimize reading from 
avro files.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [RESULT][VOTE] FLIP-211: Kerberos delegation token framework

2022-02-08 Thread Gabor Somogyi
David,

Thanks for making the design better!
I count on you to pinpoint bugs as early as possible in the upcoming PRs.
I'm just testing the first one on cluster, hope I can file it in the couple
of days...

BR,
G


On Tue, Feb 8, 2022 at 9:27 AM David Morávek  wrote:

> Thanks Gabor for driving this, I think the change is going to be really
> valuable for some of the enterprise users.
>
> Best,
> D.
>
>
> On Tue, Feb 8, 2022 at 8:33 AM Gabor Somogyi 
> wrote:
>
> > Hi devs,
> >
> > FLIP-211 [1] Has been accepted.
> > There were 3 binding votes and 2 non-binding in favor.
> > None against.
> >
> > Votes are in the order of arrival:
> >
> > Binding:
> > Gyula Fora
> > Marton Balassi
> > Chesnay Schepler
> >
> > Non-binding:
> > Junfan Zhang
> > David Moravek
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework
> >
> > BR,
> > G
> >
>


Re: [VOTE] Deprecate Per-Job Mode in Flink 1.15

2022-02-08 Thread Konstantin Knauf
Thank you everyone, I've closed the vote and created a ticket for
deprecation [1] and dropping [2] and linked the current blockers for
dropping it to the latter.

Please if or when you encounter new blockers link them to [2].

[1] https://issues.apache.org/jira/browse/FLINK-25999
[2] https://issues.apache.org/jira/browse/FLINK-26000


On Sun, Feb 6, 2022 at 6:44 AM Yang Wang  wrote:

> Thanks Konstantin for the explanation.
>
> +1 (binding) from me now.
>
>
> Best,
> Yang
>
> Xintong Song  于2022年2月3日周四 09:42写道:
>
> > Thanks for the clarification, Konstantin.
> >
> > +1 for deprecating per-job mode in Flink 1.15, and reevaluating when to
> > drop it after Flink 1.16.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Feb 1, 2022 at 5:27 PM Konstantin Knauf 
> wrote:
> >
> > > Hi Xintong, Hi Yang, Hi everyone,
> > >
> > > Thank you for speaking up. The vote is formally only about the
> > deprecation
> > > in Flink 1.15.
> > >
> > > We can and should continue to collect blockers for the deletion of
> > per-job
> > > mode on YARN. Then there should be one release that allows users to
> > switch.
> > > So, Flink 1.16 indeed is unrealistic for dropping, as we would need to
> > > address all Blockers still in Flink 1.15.
> > >
> > > I think a certain degree of urgency helps us to address these issues
> and
> > > encourages users to switch to application mode. So, I would continue to
> > > target Flink 1.17 for dropping per-job mode, but let's reevaluate after
> > > Flink 1.16.
> > >
> > > Hope this helps,
> > >
> > > Konstantin
> > >
> > > Since we recently decided that
> > > On Sun, Jan 30, 2022 at 4:13 AM Yang Wang 
> wrote:
> > >
> > > > Hi all,
> > > >
> > > >
> > > >
> > > > I second Xintong’s comments to not drop the per-job mode too
> > > aggressively.
> > > > And I am afraid
> > > >
> > > > we need to get more inputs from users after deprecating the per-job
> > mode
> > > in
> > > > release-1.15.
> > > >
> > > >
> > > > Most Flink on YARN users are using CLI command to integrate with the
> > job
> > > > lifecycle management system.
> > > >
> > > > And they are still using the old compatibility mode "flink run -m
> > > > yarn-cluster", not the generic CLI mode "--target
> > > > yarn-per-job/yarn-application".
> > > >
> > > > Apart from the functionalities, they need some time to upgrade the
> > > external
> > > > systems.
> > > >
> > > >
> > > > BTW, the application mode does not support attached mode now. Some
> > users
> > > > have asked for this in FLINK-25495[1].
> > > >
> > > >
> > > > [1]. https://issues.apache.org/jira/browse/FLINK-25495
> > > >
> > > >
> > > >
> > > >
> > > > Best,
> > > >
> > > > Yang
> > > >
> > > > Xintong Song  于2022年1月30日周日 08:35写道:
> > > >
> > > > > Hi Konstantin,
> > > > >
> > > > > Could we be more specific about what this vote is for? I'm asking
> > > > because I
> > > > > don't think we have consensus on all you have mentioned.
> > > > >
> > > > > To be specific, I'd be +1 for deprecating per-job mode in 1.15.
> > > However,
> > > > > I'm not sure about the following.
> > > > > - Targeting to drop it in 1.16 or 1.17. TBH, I'd expect to stay
> > > > compatible
> > > > > on the per-job mode a bit longer.
> > > > > - Targeting Yarn application mode on par with the standalone /
> K8s. I
> > > > think
> > > > > we need the Yarn application mode on par with the Yarn per-job
> mode,
> > as
> > > > the
> > > > > latter is being dropped and users are migrating from.
> > > > > - FLINK-24897 being the only blocker for dropping the per-job
> mode. I
> > > > think
> > > > > a good time to drop the per-job mode is probably when we know most
> > > users
> > > > > have migrated to the application mode. Even if the Yarn application
> > > mode
> > > > > provides equivalent functionality as the Yarn per-job mode does,
> it's
> > > > > probably nicer to not force users to migrate if the per-job mode is
> > > still
> > > > > widely used.
> > > > >
> > > > > Discussing the above items is not my purpose here. Just trying to
> say
> > > > that
> > > > > IMHO in the previous discussion [1] we have not reached consensus
> on
> > > all
> > > > > the things mentioned in this voting thread. Consequently, if these
> > are
> > > > all
> > > > > included in the scope of the vote, I'm afraid I cannot give my +1
> on
> > > > this.
> > > > > Sorry if I'm nitpicking.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > > [1]
> https://lists.apache.org/thread/b8g76cqgtr2c515rd1bs41vy285f317n
> > > > >
> > > > > On Sat, Jan 29, 2022 at 2:27 PM Jing Zhang 
> > > wrote:
> > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > > Thanks Konstantin for driving this.
> > > > > >
> > > > > > Best,
> > > > > > Jing Zhang
> > > > > >
> > > > > > Chenya Zhang  于2022年1月29日周六
> 07:04写道:
> > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > On Fri, Jan 28, 2022 at 12:46 PM Thomas Weise 
> > > > wrote:
> > > > > > >
> > > > > > > > +1 (binding)
> > > > > 

Re: [RESULT][VOTE] FLIP-211: Kerberos delegation token framework

2022-02-08 Thread David Morávek
Thanks Gabor for driving this, I think the change is going to be really
valuable for some of the enterprise users.

Best,
D.


On Tue, Feb 8, 2022 at 8:33 AM Gabor Somogyi 
wrote:

> Hi devs,
>
> FLIP-211 [1] Has been accepted.
> There were 3 binding votes and 2 non-binding in favor.
> None against.
>
> Votes are in the order of arrival:
>
> Binding:
> Gyula Fora
> Marton Balassi
> Chesnay Schepler
>
> Non-binding:
> Junfan Zhang
> David Moravek
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework
>
> BR,
> G
>


[RESULT] [VOTE] Deprecate Per-Job Mode

2022-02-08 Thread Konstantin Knauf
Hi everyone,

The vote on deprecating per-job mode in Flink 1.15 has been
unanimously approved in [1].

I've created a ticket for deprecation [2] and dropping [3] and linked the
current blockers for dropping it to the latter.

Binding +1
Thomas Weise
Xintong Song
Yang Wang
Jing Zhang
Till Rohrmann

Non-Binding +1
Chenya Zhang
David Moravek
Gabor Somogyi

Cheers,

Konstantin

[1] https://lists.apache.org/thread/v6oz92dfp95qcox45l0f8393089oyjv4
[2] https://issues.apache.org/jira/browse/FLINK-25999
[3] https://issues.apache.org/jira/browse/FLINK-26000

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


[jira] [Created] (FLINK-26000) Drop Per-Job Mode

2022-02-08 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-26000:


 Summary: Drop Per-Job Mode
 Key: FLINK-26000
 URL: https://issues.apache.org/jira/browse/FLINK-26000
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Konstantin Knauf






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25999) Deprecate Per-Job Mode

2022-02-08 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-25999:


 Summary: Deprecate Per-Job Mode
 Key: FLINK-25999
 URL: https://issues.apache.org/jira/browse/FLINK-25999
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Konstantin Knauf
 Fix For: 1.15.0


As discussed in [1] and voted on in [2], the community as decided to deprecate 
per-job mode.

[1] https://lists.apache.org/thread/b8g76cqgtr2c515rd1bs41vy285f317n
[2] https://lists.apache.org/thread/v6oz92dfp95qcox45l0f8393089oyjv4



--
This message was sent by Atlassian Jira
(v8.20.1#820001)