[jira] [Created] (FLINK-20015) Failure to execute streaming query
Satyam Shekhar created FLINK-20015: -- Summary: Failure to execute streaming query Key: FLINK-20015 URL: https://issues.apache.org/jira/browse/FLINK-20015 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.11.1 Reporter: Satyam Shekhar Hello, I have a table T0 with the following schema - root |-- amount: BIGINT |-- timestamp: TIMESTAMP(3) The following two queries fail execution on the above table when executed in streaming mode using the Blink planner. WITH A AS ( SELECT COUNT(*) AS ct, tumble_end(`timestamp`, INTERVAL '1' MINUTE) as tm FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE)) select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm WITH A AS ( SELECT COUNT(*) AS ct, tumble_rowtime(`timestamp`, INTERVAL '1' MINUTE) as tm FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE)) select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm The two queries are very similar and only differ in their use of tumble_end and tumble_rowtime operator. Both queries use timestamp column as their rowtime attribute. Casting "tm" column to timestamp makes both queries work - WITH A AS ( SELECT COUNT(*) AS ct, CAST(tumble_end(`timestamp`, INTERVAL '1' MINUTE) as TIMESTAMP(3)) as tm FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE)) select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm This workaround, however, loses the rowtime attribute from the output resultset for the second query. The first query fails with the following exception - java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of loader 'platform'; java.lang.Long is in module java.base of loader 'bootstrap') at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at SinkConversion$166.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$163.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:206) at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at
[jira] [Created] (FLINK-20016) Support TimestampAssigner and WatermarkGenerator for Python DataStream API.
Shuiqiang Chen created FLINK-20016: -- Summary: Support TimestampAssigner and WatermarkGenerator for Python DataStream API. Key: FLINK-20016 URL: https://issues.apache.org/jira/browse/FLINK-20016 Project: Flink Issue Type: Improvement Components: API / Python Reporter: Shuiqiang Chen Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20014) Resolve CVE-2020-11022 and CVE-2020-11023 in scala-compiler
Alan Leong created FLINK-20014: -- Summary: Resolve CVE-2020-11022 and CVE-2020-11023 in scala-compiler Key: FLINK-20014 URL: https://issues.apache.org/jira/browse/FLINK-20014 Project: Flink Issue Type: Improvement Components: Build System Reporter: Alan Leong Update scala version to resolve CVE-2020-11022 and CVE-2020-11023 in scala-compiler. This issue was addressed in Scala 2.12.12 [https://github.com/scala/scala/pull/8963.|https://github.com/scala/bug/issues/11974] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20013) BoundedBlockingSubpartition may leak network buffer if task is failed or canceled
Yingjie Cao created FLINK-20013: --- Summary: BoundedBlockingSubpartition may leak network buffer if task is failed or canceled Key: FLINK-20013 URL: https://issues.apache.org/jira/browse/FLINK-20013 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.12.0 BoundedBlockingSubpartition may leak network buffer if task is failed or canceled. We need to recycle the current BufferConsumer when task is failed or canceled. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Investigating flinK
Hi, you can write a custom log appender that modifies the logs before they are sent. Thanks. Diana El-Masri 于2020年11月6日周五 上午7:47写道: > Hi, > > No the logs of the sources connected to flink. > > Thanks > > Chesnay Schepler a écrit : > > > Are you referring to the log files of Flink? > > > > On 11/5/2020 7:01 PM, Diana El-Masri wrote: > >> Hi, > >> > >> I am starting my PhD at "Ecole Polytechnique of Montreal" on IoT > >> log management. I am considering using Flink for my edge layer > >> processing. Could please advise if there is a possibility to write > >> a flink plugin that intercepts and modify the logs before they are > >> sent to the user/cloud. if yes, what is the best way to achieve > >> this with Flink? > >> Thanks > >> > >> > > > >
Re: [DISCUSS] Move license check utility to a new repository to share it with flink-statefun
Hi Robert, I think this could be useful in flink-statefun. StateFun currently has two modules that bundles dependencies, most importantly the `flink-statefun-distribution` module which currently bundles some Flink dependencies as well as Flink connectors (Kafka, Kinesis). Upgrading the Flink version in StateFun typically involves a bulk update on the NOTICE of that module, so some automatic validation in CI could help with that. The other module that bundles dependencies is the StateFun examples, which we've been thinking about stopping to release Maven artifacts for. On Thu, Nov 5, 2020 at 9:54 PM Robert Metzger wrote: > 1. Is this relevant for flink-statefun? > So, really there is only one module that would benefit from this tool (which could possibly be enough already for sharing to make sense). To justify the efforts for sharing this nice utility, I'd like to have a better idea of: how do you intend downstream CIs in flink / flink-statefun to be using this? Would there be released artifacts from `flink-project-utils` to expose each tool (e.g. the `LicenseChecker`)? It almost looks as if it would be easiest to reuse this tool if it was provided as a Maven plugin, though I'm not sure how possible that is and probably out-of-scope for this discussion. > > 2. For the repository name, what do you think about "flink-project-utils" ? > I'd like to use a generic name so that we can potentially share other > internal utilities. > I like the idea of sharing internal utilities in general across the two repos. To gauge how useful this repo would be in the end, here's some info on what StateFun has copied already to flink-statefun: - About to copy checking for dead links in docs [1] - Several release-related scripts for creating source bundles, deploying staging jars, updating branch version, etc. [2]. However, these have somewhat evolved in StateFun to tailor it for flink-statefun, so I guess it would not make sense to share release related tooling. - Utility around building the documentation (currently flink and flink-statefun share the same Jekyll setup). Best, Gordon [1] https://github.com/apache/flink-statefun/pull/171 [2] https://github.com/apache/flink-statefun/tree/master/tools/releasing
[jira] [Created] (FLINK-20012) Hive 3.1 integration exception
Dino Zhang created FLINK-20012: -- Summary: Hive 3.1 integration exception Key: FLINK-20012 URL: https://issues.apache.org/jira/browse/FLINK-20012 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.11.2 Reporter: Dino Zhang When add extra dependencies to the /lib directory,and config hive conf in sql-client-defaults.yaml,and run /sql-client.sh embedded,But I'm getting the error {code:java} Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)VCaused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357) at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338) at org.apache.hadoop.mapred.JobConf.setJar(JobConf.java:536) at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:554) at org.apache.hadoop.mapred.JobConf.(JobConf.java:448) at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:5141) at org.apache.hadoop.hive.conf.HiveConf.(HiveConf.java:5109) at org.apache.flink.table.catalog.hive.HiveCatalog.createHiveConf(HiveCatalog.java:209) at org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:161) at org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:84) at org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:378) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:626) at java.util.HashMap.forEach(HashMap.java:1289) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:625) at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:264) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:624) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:183) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:136) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859) ... 3 more {code} At the same time,I found the guava-18 version in flink-1.11.2, but the guava-27 version in hive 3.1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20011) PageRankITCase.testPrintWithRMatGraph hangs
Dian Fu created FLINK-20011: --- Summary: PageRankITCase.testPrintWithRMatGraph hangs Key: FLINK-20011 URL: https://issues.apache.org/jira/browse/FLINK-20011 Project: Flink Issue Type: Improvement Components: Library / Graph Processing (Gelly) Affects Versions: 1.12.0 Reporter: Dian Fu [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9121=logs=1fc6e7bf-633c-5081-c32a-9dea24b05730=80a658d1-f7f6-5d93-2758-53ac19fd5b19] {code} 2020-11-05T22:42:34.4186647Z "main" #1 prio=5 os_prio=0 tid=0x7fa98c00b800 nid=0x32f8 waiting on condition [0x7fa995c12000] 2020-11-05T22:42:34.4187168Z java.lang.Thread.State: WAITING (parking) 2020-11-05T22:42:34.4187563Z at sun.misc.Unsafe.park(Native Method) 2020-11-05T22:42:34.4188246Z - parking to wait for <0x8736d120> (a java.util.concurrent.CompletableFuture$Signaller) 2020-11-05T22:42:34.411Z at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 2020-11-05T22:42:34.4189351Z at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) 2020-11-05T22:42:34.4189930Z at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) 2020-11-05T22:42:34.4190509Z at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) 2020-11-05T22:42:34.4191059Z at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) 2020-11-05T22:42:34.4191591Z at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:893) 2020-11-05T22:42:34.4192208Z at org.apache.flink.graph.asm.dataset.DataSetAnalyticBase.execute(DataSetAnalyticBase.java:55) 2020-11-05T22:42:34.4192787Z at org.apache.flink.graph.drivers.output.Print.write(Print.java:48) 2020-11-05T22:42:34.4193373Z at org.apache.flink.graph.Runner.execute(Runner.java:454) 2020-11-05T22:42:34.4194156Z at org.apache.flink.graph.Runner.main(Runner.java:507) 2020-11-05T22:42:34.4194618Z at org.apache.flink.graph.drivers.DriverBaseITCase.getSystemOutput(DriverBaseITCase.java:208) 2020-11-05T22:42:34.4195192Z at org.apache.flink.graph.drivers.DriverBaseITCase.expectedCount(DriverBaseITCase.java:100) 2020-11-05T22:42:34.4195914Z at org.apache.flink.graph.drivers.PageRankITCase.testPrintWithRMatGraph(PageRankITCase.java:60) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20010) SinkITCase.writerAndCommitterAndGlobalCommitterExecuteInStreamingMode fails on Azure Pipeline
Yingjie Cao created FLINK-20010: --- Summary: SinkITCase.writerAndCommitterAndGlobalCommitterExecuteInStreamingMode fails on Azure Pipeline Key: FLINK-20010 URL: https://issues.apache.org/jira/browse/FLINK-20010 Project: Flink Issue Type: Bug Components: Tests Reporter: Yingjie Cao Fix For: 1.12.0 SinkITCase.writerAndCommitterAndGlobalCommitterExecuteInStreamingMode fails on Azure Pipeline {code:java} at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Investigating flinK
Hi, No the logs of the sources connected to flink. Thanks Chesnay Schepler a écrit : Are you referring to the log files of Flink? On 11/5/2020 7:01 PM, Diana El-Masri wrote: Hi, I am starting my PhD at "Ecole Polytechnique of Montreal" on IoT log management. I am considering using Flink for my edge layer processing. Could please advise if there is a possibility to write a flink plugin that intercepts and modify the logs before they are sent to the user/cloud. if yes, what is the best way to achieve this with Flink? Thanks
Re: Investigating flinK
Are you referring to the log files of Flink? On 11/5/2020 7:01 PM, Diana El-Masri wrote: Hi, I am starting my PhD at "Ecole Polytechnique of Montreal" on IoT log management. I am considering using Flink for my edge layer processing. Could please advise if there is a possibility to write a flink plugin that intercepts and modify the logs before they are sent to the user/cloud. if yes, what is the best way to achieve this with Flink? Thanks
[jira] [Created] (FLINK-20009) Add 404 check to docs build
Seth Wiesman created FLINK-20009: Summary: Add 404 check to docs build Key: FLINK-20009 URL: https://issues.apache.org/jira/browse/FLINK-20009 Project: Flink Issue Type: Improvement Components: Stateful Functions Reporter: Seth Wiesman Assignee: Seth Wiesman -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20008) Java Deadlock in ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement()
Stephan Ewen created FLINK-20008: Summary: Java Deadlock in ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement() Key: FLINK-20008 URL: https://issues.apache.org/jira/browse/FLINK-20008 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Stephan Ewen Fix For: 1.12.0 The stack trace detects a deadlock between the testing thread and the curator event thread. Full log: https://dev.azure.com/sewen0794/Flink/_build/results?buildId=176=logs=6e58d712-c5cc-52fb-0895-6ff7bd56c46b=f30a8e80-b2cf-535c-9952-7f521a4ae374 Relevant Stack Trace: {code} Found one Java-level deadlock: = "main-EventThread": waiting to lock monitor 0x7f74c00045e8 (object 0x8ed14cb0, a java.lang.Object), which is held by "main" "main": waiting to lock monitor 0x7f74e401a1f8 (object 0x8ed15008, a org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch), which is held by "main-EventThread" Java stack information for the threads listed above: === "main-EventThread": at org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:186) - waiting to lock <0x8ed14cb0> (a java.lang.Object) at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:158) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch$9.apply(LeaderLatch.java:693) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch$9.apply(LeaderLatch.java:689) at org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:100) at org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) at org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:92) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:688) - locked <0x8ed15008> (a org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.checkLeadership(LeaderLatch.java:567) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.access$700(LeaderLatch.java:65) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch$7.processResult(LeaderLatch.java:618) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:883) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:653) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.GetChildrenBuilderImpl$2.processResult(GetChildrenBuilderImpl.java:187) at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:601) at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) "main": at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.close(LeaderLatch.java:203) - waiting to lock <0x8ed15008> (a org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.close(LeaderLatch.java:190) at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.close(ZooKeeperLeaderElectionDriver.java:140) at org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.stop(DefaultLeaderElectionService.java:103) - locked <0x8ed14cb0> (a java.lang.Object) at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement(ZooKeeperLeaderElectionTest.java:310) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
Investigating flinK
Hi, I am starting my PhD at "Ecole Polytechnique of Montreal" on IoT log management. I am considering using Flink for my edge layer processing. Could please advise if there is a possibility to write a flink plugin that intercepts and modify the logs before they are sent to the user/cloud. if yes, what is the best way to achieve this with Flink? Thanks
[jira] [Created] (FLINK-20007) SinkTransformationTranslator fail to handle the PartitionTransformation
Guowei Ma created FLINK-20007: - Summary: SinkTransformationTranslator fail to handle the PartitionTransformation Key: FLINK-20007 URL: https://issues.apache.org/jira/browse/FLINK-20007 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Guowei Ma In current version `SinkTransformationTranslator` connects the `SinkWriter` with a `PartitionerTransformation` if the input transformation of `SinkTransformation` is `PartitionTransformation`. This would lead to `NullPointExcetion`. Actually `SinkTransformationTranslator` should connect the `Writer` to the real upstream node if input of the `SinkTransformation` is `PartitionTransformation`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20006) FileSinkITCase.testFileSink: The record 0 should occur 4 times, but only occurs 8time expected:<4> but was:<8>
Robert Metzger created FLINK-20006: -- Summary: FileSinkITCase.testFileSink: The record 0 should occur 4 times, but only occurs 8time expected:<4> but was:<8> Key: FLINK-20006 URL: https://issues.apache.org/jira/browse/FLINK-20006 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.12.0 Reporter: Robert Metzger https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9082=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf {code} 2020-11-05T13:31:16.7006473Z [ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 5.565 s <<< FAILURE! - in org.apache.flink.connector.file.sink.FileSinkITCase 2020-11-05T13:31:16.7007237Z [ERROR] testFileSink[executionMode = STREAMING, triggerFailover = true](org.apache.flink.connector.file.sink.FileSinkITCase) Time elapsed: 0.548 s <<< FAILURE! 2020-11-05T13:31:16.7007897Z java.lang.AssertionError: The record 0 should occur 4 times, but only occurs 8time expected:<4> but was:<8> 2020-11-05T13:31:16.7008317Zat org.junit.Assert.fail(Assert.java:88) 2020-11-05T13:31:16.7008644Zat org.junit.Assert.failNotEquals(Assert.java:834) 2020-11-05T13:31:16.7008987Zat org.junit.Assert.assertEquals(Assert.java:645) 2020-11-05T13:31:16.7009392Zat org.apache.flink.connector.file.sink.FileSinkITCase.checkResult(FileSinkITCase.java:218) 2020-11-05T13:31:16.7009889Zat org.apache.flink.connector.file.sink.FileSinkITCase.testFileSink(FileSinkITCase.java:132) 2020-11-05T13:31:16.7010316Zat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Resource Optimization for Flink Job in AWS EMR Cluster
Thanks Prasanna & Till for quick response. Looks like my use case is very similar to yours ,I will try to run multiple containers on the same machine and will update you accordingly. Thanks , -Deep On Thu, Nov 5, 2020 at 2:33 PM Till Rohrmann wrote: > Hi Deep, > > you can increase the average CPU load by reducing the number of overall > resources. Having fewer slots over which you can distribute the work should > increase the resource usage. > > Cheers, > Till > > On Thu, Nov 5, 2020 at 9:03 AM Prasanna kumar < > prasannakumarram...@gmail.com> > wrote: > > > Deep, > > > > 1) Is it a cpu/memory/io intensive job ?? > > > > Based on that you could allocate resources. > > > > From the question, if the CPU is not utilised , you could run multiple > > containers on the same machine(tm) ... > > > > Following may not be exact case as yours but to give you an idea. > > > > Few months back I have run jobs in emr processing 4-8k per second from > > kafka with paralleism of 8 doing lightweight transformation where end to > > end latency was less than a second (10-50ms). > > > > I used slots where memory allocated is 4GB and JM memory 1gb. Here > > multilple containers ran on the same machine and I got cpu usgae upto > 50%. > > Earlier it was in single digits when just single container ran on a > > machine. > > > > Prasanna. > > > > > > On Thu 5 Nov, 2020, 12:40 Satyaa Dixit, wrote: > > > > > Hi Deep, > > > > > > Thanks for bringing this on table, I'm also facing a similar kind of > > issue > > > while deploying my flink Job w.r.t resources optimization. > > > > > > Hi Team, > > > > > > It would be much appreciated if someone helps us here. > > > > > > > > > Regards, > > > Satya > > > > > > On Wed, Nov 4, 2020 at 6:33 PM DEEP NARAYAN Singh < > about.d...@gmail.com> > > > wrote: > > > > > > > Hi All, > > > > > > > > I am running a flink streaming job in EMR Cluster with parallelism 21 > > > > having 500 records per second.But still seeing cpu utilization is > > > > approximate 5-8 percent. > > > > > > > > Below is the long running session command in EMR Cluster having 3 > > > instance > > > > of type C52xlarge(8vcore, 16 GB memory, AWS resource) > > > > > > > > *sudo flink-yarn-session -n 3 -s 7 -jm 4168 -tm 8000 -d* > > > > > > > > Anyone can suggest some configuration to maximize the CPU > utilization? > > > > And Also what would be the standard utilization of CPU for flink job > in > > > > order to achieve the minimum latency? > > > > > > > > Any leads would be appreciated. > > > > > > > > Thanks, > > > > -Deep > > > > > > > > > > > > > -- > > > -- > > > Best Regards > > > Satya Prakash > > > (M)+91-9845111913 > > > > > >
[jira] [Created] (FLINK-20005) "Kerberized YARN application" test unstable
Robert Metzger created FLINK-20005: -- Summary: "Kerberized YARN application" test unstable Key: FLINK-20005 URL: https://issues.apache.org/jira/browse/FLINK-20005 Project: Flink Issue Type: Bug Components: Deployment / YARN, Runtime / Coordination Affects Versions: 1.12.0 Reporter: Robert Metzger https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9066=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529 The {{Running Kerberized YARN application on Docker test (default input)}} is failing. These are some exceptions spotted in the logs: {code} 2020-11-05T14:22:29.3315695Z Nov 05 14:22:29 2020-11-05 14:21:52,696 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Flat Map (2/3) (7806b7a7074425c5ff0906befd94e122) switched from SCHEDULED to FAILED on not deployed. 2020-11-05T14:22:29.3318307Z Nov 05 14:22:29 java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout 2020-11-05T14:22:29.3320512Z Nov 05 14:22:29at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_272] 2020-11-05T14:22:29.3322173Z Nov 05 14:22:29at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_272] 2020-11-05T14:22:29.3323809Z Nov 05 14:22:29at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) ~[?:1.8.0_272] 2020-11-05T14:22:29.3325448Z Nov 05 14:22:29at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_272] 2020-11-05T14:22:29.3331094Z Nov 05 14:22:29at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_272] 2020-11-05T14:22:29.3332769Z Nov 05 14:22:29at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_272] 2020-11-05T14:22:29.3335736Z Nov 05 14:22:29at org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:195) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] 2020-11-05T14:22:29.3342621Z Nov 05 14:22:29at org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:147) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] 2020-11-05T14:22:29.3348463Z Nov 05 14:22:29at org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:84) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] 2020-11-05T14:22:29.3353749Z Nov 05 14:22:29at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] 2020-11-05T14:22:29.3362495Z Nov 05 14:22:29at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:87) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] 2020-11-05T14:22:29.3366937Z Nov 05 14:22:29at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_272] 2020-11-05T14:22:29.3370686Z Nov 05 14:22:29at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_272] 2020-11-05T14:22:29.3380715Z Nov 05 14:22:29at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] 2020-11-05T14:22:29.3384436Z Nov 05 14:22:29at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] 2020-11-05T14:22:29.3387431Z Nov 05 14:22:29at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] 2020-11-05T14:22:29.3390333Z Nov 05 14:22:29at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] 2020-11-05T14:22:29.3392937Z Nov 05 14:22:29at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] 2020-11-05T14:22:29.3395430Z Nov 05 14:22:29at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] 2020-11-05T14:22:29.3397949Z Nov 05 14:22:29at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] 2020-11-05T14:22:29.3401799Z Nov 05 14:22:29at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[jira] [Created] (FLINK-20004) UpperLimitExceptionParameter description is misleading
Flavio Pompermaier created FLINK-20004: -- Summary: UpperLimitExceptionParameter description is misleading Key: FLINK-20004 URL: https://issues.apache.org/jira/browse/FLINK-20004 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.11.2 Reporter: Flavio Pompermaier The maxExceptions query parameter of /jobs/:jobid/exceptions REST API is an integer parameter, not a list of comma separated values..this is probably a cut-and-paste error -- This message was sent by Atlassian Jira (v8.3.4#803005)
[DISCUSS] Move license check utility to a new repository to share it with flink-statefun
Hi all, for the upcoming 1.12 release (and beyond ;) ), I added a utility [1] that checks for the most common licensing issues. It doesn't find everything, but the by far most common case of forgetting to add a version upgrade or changed transitive dependency to the NOTICE file is covered. Chesnay had the idea of creating a new repository for the tool and sharing it with flink-statefun to do the license checks automatically there as well. 1. Is this relevant for flink-statefun? 2. For the repository name, what do you think about "flink-project-utils" ? I'd like to use a generic name so that we can potentially share other internal utilities. Let me know what you think. Robert [1] https://github.com/apache/flink/blob/master/tools/ci/license_check.sh
[jira] [Created] (FLINK-20003) Improve slot report logging messages
Chesnay Schepler created FLINK-20003: Summary: Improve slot report logging messages Key: FLINK-20003 URL: https://issues.apache.org/jira/browse/FLINK-20003 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.12.0 Slot reports that are received by the ResourceManager are logged. Currently, such a message looks like this: {code} 16381 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received slot report from instance aabc4f58e13c5038349292df30a00d35: SlotReport{slotsStatus=[SlotStatus{slotID=d006c062-d452-4aaa-bdd3-951fba1e80e5_0, resourceProfile=ResourceProfile{managedMemory=20.000mb (20971520 bytes), networkMemory=16.000mb (16777216 bytes)}, allocationID=d8d7358d4ba604393dee5d495ee94288, jobID=ced59189ae30c41896bfd86a0217668d}, SlotStatus{slotID=d006c062-d452-4aaa-bdd3-951fba1e80e5_1, resourceProfile=ResourceProfile{managedMemory=20.000mb (20971520 bytes), networkMemory=16.000mb (16777216 bytes)}, allocationID=f4ea20ff0924653bd3e524d3a594b8ed, jobID=ced59189ae30c41896bfd86a0217668d}, SlotStatus{slotID=d006c062-d452-4aaa-bdd3-951fba1e80e5_2, resourceProfile=ResourceProfile{managedMemory=20.000mb (20971520 bytes), networkMemory=16.000mb (16777216 bytes)}, allocationID=3e191837d45879ab59b45eedf7685235, jobID=ced59189ae30c41896bfd86a0217668d}, SlotStatus{slotID=d006c062-d452-4aaa-bdd3-951fba1e80e5_3, resourceProfile=ResourceProfile{managedMemory=20.000mb (20971520 bytes), networkMemory=16.000mb (16777216 bytes)}, allocationID=null, jobID=null}]}. {code} I propose 2 changes: 1) Invert the order in which the slot status fields are printed. The job and allocation IDs are usually the interesting parts, with the resource profile commonly just being a bunch of noise. 2) introduce line-breaks (up to a limit), so that you can tell at a glance what is going on, particularly when debugging tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS][docker] Adopt Jemalloc as default memory allocator for debian based Flink docker image
For your information, I executed the flink-benchmarks [1] twice within docker container based on flink-1.11.1 scala_2.11-java8-debian image [2], from the results of state benchmarks, I cannot see obvious performance changes whether to use jemalloc as memory allocator: RocksDB keyed state backend ops/ms glibc jemalloccomparison ListStateBenchmark.listAdd 537.613 549.926 2.29% ListStateBenchmark.listAddAll 301.764 295.51 -2.07% ListStateBenchmark.listAppend 521.32 522.614 0.25% ListStateBenchmark.listGet 139.19 141.321 1.53% ListStateBenchmark.listGetAndIterate139.685 141.871 1.56% ListStateBenchmark.listUpdate 534.785 559.509 4.62% MapStateBenchmark.mapAdd469.748 475.241 1.17% MapStateBenchmark.mapContains 51.481 52.188 1.37% MapStateBenchmark.mapEntries352.439 357.951 1.56% MapStateBenchmark.mapGet51.903 52.065 0.31% MapStateBenchmark.mapIsEmpty47.38 48.16 1.65% MapStateBenchmark.mapIterator 351.41 357.412 1.71% MapStateBenchmark.mapKeys 361.339 359.773 -0.43% MapStateBenchmark.mapPutAll 117.067 111.842 -4.46% MapStateBenchmark.mapRemove 497.361 499.771 0.48% MapStateBenchmark.mapUpdate 464.865 463.501 -0.29% MapStateBenchmark.mapValues 350.942 358.64 2.19% ValueStateBenchmark.valueAdd475.55 462.627 -2.72% ValueStateBenchmark.valueGet713.389 729.126 2.21% ValueStateBenchmark.valueUpdate 476.373 482.183 1.22% Heap keyed state backend ops/ms glibc jemalloccomparison ListStateBenchmark.listAdd 67116614.719-1.43% ListStateBenchmark.listAddAll 700.429 713.487 1.86% ListStateBenchmark.listAppend 2841.0682848.4160.26% ListStateBenchmark.listGet 2863.7042835.862-0.97% ListStateBenchmark.listGetAndIterate2790.0012787.145-0.10% ListStateBenchmark.listUpdate 2802.2872802.6260.01% MapStateBenchmark.mapAdd1939.7551950.7590.57% MapStateBenchmark.mapContains 1914.49 1943.5291.52% MapStateBenchmark.mapEntries11836.215 11836.673 0.00% MapStateBenchmark.mapGet1753.8171756.6430.16% MapStateBenchmark.mapIsEmpty2980.2992960.752-0.66% MapStateBenchmark.mapIterator 11151.177 11123.037 -0.25% MapStateBenchmark.mapKeys 12956.381 12778.626 -1.37% MapStateBenchmark.mapPutAll 1253.2691247.15 -0.49% MapStateBenchmark.mapRemove 2594.1522575.233-0.73% MapStateBenchmark.mapUpdate 1865.7451880.5730.79% MapStateBenchmark.mapValues 12546.359 12473.223 -0.58% ValueStateBenchmark.valueAdd3947.2283932.003-0.39% ValueStateBenchmark.valueGet3887.0033863.13 -0.61% ValueStateBenchmark.valueUpdate 3978.9793973.183-0.15% [1] https://github.com/apache/flink-benchmarks [2] https://github.com/apache/flink-docker/tree/master/1.11/scala_2.11-java8-debian Best Yun Tang From: Yun Tang Sent: Wednesday, November 4, 2020 15:41 To: dev@flink.apache.org Subject: Re: [DISCUSS][docker] Adopt Jemalloc as default memory allocator for debian based Flink docker image Hi @ Billy I found there existed many benchmark case existed in the two repos, which benchmark case did you run? Best Yun Tang From: Xie Billy Sent: Tuesday, November 3, 2020 22:08 To: dev@flink.apache.org Subject: Re: [DISCUSS][docker] Adopt Jemalloc as default memory allocator for debian based Flink docker image Hi guys: refer: https://stackoverflow.com/questions/13027475/cpu-and-memory-usage-of-jemalloc-as-compared-to-glibc-malloc code: https://github.com/jemalloc/jemalloc-experiments https://code.woboq.org/userspace/glibc/benchtests/ Best Regards! Billy xie(谢志民) On Fri, Oct 30, 2020 at 4:27 PM Yun Tang wrote: > Hi > > > Do you see a noticeable performance difference between the two? > @ Stephan Ewen , as we already use jemalloc as default memory allocator in > production, we do not have much experience to compare performace between > glibc and jemalloc. And I did not take a look at the performance difference > when I debug docker OOM. I'll have a try to run benchmark on docker with > different allocators when I have time these days. > > @wang gang, yes, that is what I also observed when I pmap the memory when > using glibc, many memory segments with 64MB size. > > @ Billy Xie, what kind of test case did you use? In my point of view, > compared to who would use more memory in some cases, we should care more > about who would behave under
[jira] [Created] (FLINK-20002) Add a StreamExecutionEnvironment#getExecutionEnvironment(Configuration) method
Dawid Wysakowicz created FLINK-20002: Summary: Add a StreamExecutionEnvironment#getExecutionEnvironment(Configuration) method Key: FLINK-20002 URL: https://issues.apache.org/jira/browse/FLINK-20002 Project: Flink Issue Type: Improvement Components: API / DataStream Reporter: Dawid Wysakowicz Assignee: Dawid Wysakowicz Fix For: 1.12.0 >From a usability perspective it would be nice to be able to construct a >{{StreamExecutionEnvironment}} from a give {{Configuration}}: {code} Configuration configuration = new Configuration(); configuration.setString("state.backend", "jobmanager"); configuration.set(CoreOptions.DEFAULT_PARALLELISM, 10); configuration.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, Duration.ofMillis(100)); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment( configuration); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20001) Don't use setAllVerticesInSameSlotSharingGroupByDefault in StreamGraphGenerator
Aljoscha Krettek created FLINK-20001: Summary: Don't use setAllVerticesInSameSlotSharingGroupByDefault in StreamGraphGenerator Key: FLINK-20001 URL: https://issues.apache.org/jira/browse/FLINK-20001 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek I think the default of having all vertices in the same slot sharing group should be good for both {{BATCH}} and {{STREAMING}} right now. We can reconsider actually setting this flag in the future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20000) Extend the license checker to ensure that 3rd party licenses are shipped
Robert Metzger created FLINK-2: -- Summary: Extend the license checker to ensure that 3rd party licenses are shipped Key: FLINK-2 URL: https://issues.apache.org/jira/browse/FLINK-2 Project: Flink Issue Type: Improvement Components: Build System Reporter: Robert Metzger Extend the license checker introduced in FLINK-19810 to also ensure that 3rd party licenses are shipped in META-INF/licenses. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19999) State Processor API classes leaking into savepoint
Nico Kruber created FLINK-1: --- Summary: State Processor API classes leaking into savepoint Key: FLINK-1 URL: https://issues.apache.org/jira/browse/FLINK-1 Project: Flink Issue Type: Bug Components: API / State Processor Affects Versions: 1.11.2 Reporter: Nico Kruber Currently, any configuration for serializers that you are using when writing a State Processor API job will be shared with the serializers that are used for writing a savepoint. However, your normal job shouldn't necessarily depend on (helper) classes that you only use in the StateProc API job. By default, for example, {{ExecutionConfig#autoTypeRegistrationEnabled = true}} and thus classes like {{org.apache.flink.runtime.checkpoint.OperatorSubtaskState}} will be registered with Kryo and will thus also be needed when reading the created savepoint if you have Kryo serialization in your job. This particular instance can be worked around by calling {{ExecutionConfig#disableAutoTypeRegistration()}} but the problem is probably bigger and extends to other type registrations, e.g. POJOs, as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19998) Invalid Link to checkpoints and Savepoints in stateful stream processing concepts
Aditya Agarwal created FLINK-19998: -- Summary: Invalid Link to checkpoints and Savepoints in stateful stream processing concepts Key: FLINK-19998 URL: https://issues.apache.org/jira/browse/FLINK-19998 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.11.2 Reporter: Aditya Agarwal In the *docs/concepts/stateful-stream-processing.md* file, under the first section (What is State), the following two links are broken: # Checkpoints: *[checkpoints](\{{ site.baseurl}}\{% link dev/stream/state/checkpointing.md %})* # Savepoints: *[savepoints](\{{ site.baseurl }}\{%link ops/state/savepoints.md %})* This results in the target link as follows: # For Checkpoints: [https://ci.apache.org/projects/flink/flink-docs-master//ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/checkpointing.html] # [https://ci.apache.org/projects/flink/flink-docs-master//ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19997) Implement an e2e test for sql-client with Confluent Registry Avro format
Dawid Wysakowicz created FLINK-19997: Summary: Implement an e2e test for sql-client with Confluent Registry Avro format Key: FLINK-19997 URL: https://issues.apache.org/jira/browse/FLINK-19997 Project: Flink Issue Type: Test Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Client Affects Versions: 1.12.0 Reporter: Dawid Wysakowicz We should add an e2e test that would verify the format as well as packaging of the format sql jar. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19996) Add end-to-end IT case for Debezium + Kafka + temporal join
Jark Wu created FLINK-19996: --- Summary: Add end-to-end IT case for Debezium + Kafka + temporal join Key: FLINK-19996 URL: https://issues.apache.org/jira/browse/FLINK-19996 Project: Flink Issue Type: Sub-task Components: Connectors / Kafka, Table SQL / Planner Reporter: Jark Wu Fix For: 1.12.0 This is one of the most important use case when we propose FLIP-132. We should add an end-to-end test for this. We should use the {{source.ts_ms}} metadata as the rowtime attribute of the kafka debezium table. This is blocked by FLINK-19276. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19995) 【Flink SQL Client】Use Flink Kafka Connector has more one exception
zhisheng created FLINK-19995: Summary: 【Flink SQL Client】Use Flink Kafka Connector has more one exception Key: FLINK-19995 URL: https://issues.apache.org/jira/browse/FLINK-19995 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Table SQL / Client Affects Versions: 1.12.0 Reporter: zhisheng Attachments: image-2020-11-05-17-35-10-103.png, image-2020-11-05-17-37-21-610.png, image-2020-11-05-17-40-05-630.png, image-2020-11-05-17-41-01-319.png when i add flink-sql-connector-kafka_2.11-1.12-SNAPSHOT.jar in lib, I run sql job has an exception like picture2 !image-2020-11-05-17-35-10-103.png|width=658,height=251! !image-2020-11-05-17-37-21-610.png|width=812,height=600! {code:java} [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer {code} when i add flink-connector-kafka_2.11-1.12-SNAPSHOT.jar in lib, it run has another exception !image-2020-11-05-17-41-01-319.png|width=841,height=318! !image-2020-11-05-17-40-05-630.png|width=955,height=581! {code:java} [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer {code} if i add both jar, it returm exception too {code:java} [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer {code} ddl & sql: {code:java} CREATE TABLE UserBehavior ( user_id BIGINT, item_id BIGINT, behavior CHAR(2), `time` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'user_behavior_flink', 'format' = 'json', 'json.ignore-parse-errors' = 'true', 'scan.startup.mode' = 'earliest-offset', 'scan.topic-partition-discovery.interval' = '1' ); select * from UserBehavior;{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19994) All vertices in an DataSet iteration job will be eagerly scheduled
Zhu Zhu created FLINK-19994: --- Summary: All vertices in an DataSet iteration job will be eagerly scheduled Key: FLINK-19994 URL: https://issues.apache.org/jira/browse/FLINK-19994 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Zhu Zhu Fix For: 1.12.0 After switching to pipelined region scheduling, all vertices in an DataSet iteration job will be eagerly scheduled, which means BLOCKING result consumers can be deployed even before the result finishes and resource waste happens. This is because all vertices will be put into one pipelined region if the job contains {{ColocationConstraint}}, see [PipelinedRegionComputeUtil|https://github.com/apache/flink/blob/c0f382f5f0072441ef8933f6993f1c34168004d6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java#L52]. IIUC, this {{makeAllOneRegion()}} behavior was introduced to ensure co-located iteration head and tail to be restarted together in pipelined region failover. However, given that edges within an iteration will always be PIPELINED ([ref|https://github.com/apache/flink/blob/0523ef6451a93da450c6bdf5dd4757c3702f3962/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java#L1188]), co-located iteration head and tail will always be in the same region. So I think we can drop the {{PipelinedRegionComputeUtil#makeAllOneRegion()}} code path and build regions in the the same way no matter if there is co-location constraints or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Resource Optimization for Flink Job in AWS EMR Cluster
Hi Deep, you can increase the average CPU load by reducing the number of overall resources. Having fewer slots over which you can distribute the work should increase the resource usage. Cheers, Till On Thu, Nov 5, 2020 at 9:03 AM Prasanna kumar wrote: > Deep, > > 1) Is it a cpu/memory/io intensive job ?? > > Based on that you could allocate resources. > > From the question, if the CPU is not utilised , you could run multiple > containers on the same machine(tm) ... > > Following may not be exact case as yours but to give you an idea. > > Few months back I have run jobs in emr processing 4-8k per second from > kafka with paralleism of 8 doing lightweight transformation where end to > end latency was less than a second (10-50ms). > > I used slots where memory allocated is 4GB and JM memory 1gb. Here > multilple containers ran on the same machine and I got cpu usgae upto 50%. > Earlier it was in single digits when just single container ran on a > machine. > > Prasanna. > > > On Thu 5 Nov, 2020, 12:40 Satyaa Dixit, wrote: > > > Hi Deep, > > > > Thanks for bringing this on table, I'm also facing a similar kind of > issue > > while deploying my flink Job w.r.t resources optimization. > > > > Hi Team, > > > > It would be much appreciated if someone helps us here. > > > > > > Regards, > > Satya > > > > On Wed, Nov 4, 2020 at 6:33 PM DEEP NARAYAN Singh > > wrote: > > > > > Hi All, > > > > > > I am running a flink streaming job in EMR Cluster with parallelism 21 > > > having 500 records per second.But still seeing cpu utilization is > > > approximate 5-8 percent. > > > > > > Below is the long running session command in EMR Cluster having 3 > > instance > > > of type C52xlarge(8vcore, 16 GB memory, AWS resource) > > > > > > *sudo flink-yarn-session -n 3 -s 7 -jm 4168 -tm 8000 -d* > > > > > > Anyone can suggest some configuration to maximize the CPU utilization? > > > And Also what would be the standard utilization of CPU for flink job in > > > order to achieve the minimum latency? > > > > > > Any leads would be appreciated. > > > > > > Thanks, > > > -Deep > > > > > > > > > -- > > -- > > Best Regards > > Satya Prakash > > (M)+91-9845111913 > > >
[jira] [Created] (FLINK-19993) Remove the flink-connector-filesystem module
Kostas Kloudas created FLINK-19993: -- Summary: Remove the flink-connector-filesystem module Key: FLINK-19993 URL: https://issues.apache.org/jira/browse/FLINK-19993 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Affects Versions: 1.12.0 Reporter: Kostas Kloudas Assignee: Kostas Kloudas The flink-connector-filesystem module contains (only) the deprecated {{BucketingSink}}. The BucketingSink. The sink is deprecated since FLINK 1.9 in favour of the relatively recently introduced {{StreamingFileSink}}. For the sake of a clean and more manageable codebase, the community chose to remove it [1] after the discussion in [2]. [1]https://lists.apache.org/thread.html/red2bc04c5d60a6a923ea499a49d7889d1c31ac1987ea6d9a3fe5%40%3Cdev.flink.apache.org%3E [2] https://lists.apache.org/thread.html/re24ceedc02402ac9a6ce1e07b690852320a265b081f416ebac543aaf%40%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19992) Integrate new orc to Hive source
Jingsong Lee created FLINK-19992: Summary: Integrate new orc to Hive source Key: FLINK-19992 URL: https://issues.apache.org/jira/browse/FLINK-19992 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Reporter: Jingsong Lee Assignee: Jingsong Lee Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] Remove flink-connector-filesystem module.
Hi all, Given the current outcome of the voting process, the proposal for removing the "flink-connector-filesystem module" has passed. I will move on with opening a JIRA and actually removing the module and all the related code. Thank you all for voting, Kostas On Fri, Oct 30, 2020 at 5:05 PM Till Rohrmann wrote: > > +1 > > Cheers, > Till > > On Fri, Oct 30, 2020 at 11:54 AM Guowei Ma wrote: > > > +1 > > Best, > > Guowei > > > > > > On Fri, Oct 30, 2020 at 6:02 PM Aljoscha Krettek > > wrote: > > > > > +1 > > > > > > Aljoscha > > > > > > On 29.10.20 09:18, Kostas Kloudas wrote: > > > > Hi all, > > > > > > > > Following the discussion in [1], I would like to start a vote on > > > > removing the flink-connector-filesystem module which includes the > > > > BucketingSink. > > > > > > > > The vote will be open till November 3rd (72h, excluding the weekend) > > > > unless there is an objection or not enough votes. > > > > > > > > Cheers, > > > > Kostas > > > > > > > > [1] > > > > > https://lists.apache.org/thread.html/re24ceedc02402ac9a6ce1e07b690852320a265b081f416ebac543aaf%40%3Cuser.flink.apache.org%3E > > > > > > > > > > > >
[jira] [Created] (FLINK-19991) UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointMassivelyParallel fails on Azure Pipeline
Yingjie Cao created FLINK-19991: --- Summary: UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointMassivelyParallel fails on Azure Pipeline Key: FLINK-19991 URL: https://issues.apache.org/jira/browse/FLINK-19991 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / Network Reporter: Yingjie Cao Fix For: 1.12.0 UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointMassivelyParallel fails on Azure Pipeline {code:java} java.lang.AssertionError: Expected: <0L> but: was <1809L> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.rules.ErrorCollector$1.call(ErrorCollector.java:65) at org.junit.rules.ErrorCollector.checkSucceeds(ErrorCollector.java:78) at org.junit.rules.ErrorCollector.checkThat(ErrorCollector.java:63) at org.junit.rules.ErrorCollector.checkThat(ErrorCollector.java:54) at org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.execute(UnalignedCheckpointITCase.java:189) at org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.shouldPerformUnalignedCheckpointMassivelyParallel(UnalignedCheckpointITCase.java:179) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.Verifier$1.evaluate(Verifier.java:35) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Resource Optimization for Flink Job in AWS EMR Cluster
Deep, 1) Is it a cpu/memory/io intensive job ?? Based on that you could allocate resources. >From the question, if the CPU is not utilised , you could run multiple containers on the same machine(tm) ... Following may not be exact case as yours but to give you an idea. Few months back I have run jobs in emr processing 4-8k per second from kafka with paralleism of 8 doing lightweight transformation where end to end latency was less than a second (10-50ms). I used slots where memory allocated is 4GB and JM memory 1gb. Here multilple containers ran on the same machine and I got cpu usgae upto 50%. Earlier it was in single digits when just single container ran on a machine. Prasanna. On Thu 5 Nov, 2020, 12:40 Satyaa Dixit, wrote: > Hi Deep, > > Thanks for bringing this on table, I'm also facing a similar kind of issue > while deploying my flink Job w.r.t resources optimization. > > Hi Team, > > It would be much appreciated if someone helps us here. > > > Regards, > Satya > > On Wed, Nov 4, 2020 at 6:33 PM DEEP NARAYAN Singh > wrote: > > > Hi All, > > > > I am running a flink streaming job in EMR Cluster with parallelism 21 > > having 500 records per second.But still seeing cpu utilization is > > approximate 5-8 percent. > > > > Below is the long running session command in EMR Cluster having 3 > instance > > of type C52xlarge(8vcore, 16 GB memory, AWS resource) > > > > *sudo flink-yarn-session -n 3 -s 7 -jm 4168 -tm 8000 -d* > > > > Anyone can suggest some configuration to maximize the CPU utilization? > > And Also what would be the standard utilization of CPU for flink job in > > order to achieve the minimum latency? > > > > Any leads would be appreciated. > > > > Thanks, > > -Deep > > > > > -- > -- > Best Regards > Satya Prakash > (M)+91-9845111913 >