Re: Maybe a flink bug. Job keeps in FAILING state
Thanks for opening this ticket and I would watch it. Flink does not handle OOM issue specially. I remembered we ever discussed the similar issue before but forgot the conclusion then or have other concerns for it. I am not sure whether it is worth to fix atm, maybe Till or Chesnay could give a final decision. Best, Zhijiang -- From:Joshua Fan Send Time:2019年6月25日(星期二) 11:10 To:zhijiang Cc:Chesnay Schepler ; user ; Till Rohrmann Subject:Re: Maybe a flink bug. Job keeps in FAILING state Hi Zhijiang Thank you for your analysis. I agree with it. The solution may be to let tm exit like you mentioned when any type of oom occurs, because the flink has no control on a tm when a oom occurs. I fired a jira before, https://issues.apache.org/jira/browse/FLINK-12889. Don't know it is worth to fix. Thank you all. Yours sincerely Joshua On Fri, Jun 21, 2019 at 5:32 PM zhijiang wrote: Thanks for the reminding @Chesnay Schepler . I just looked throught the related logs. Actually all the five "Source: ServiceLog" tasks are not in terminal state on JM view, the relevant processes are as follows: 1. The checkpoint in task causes OOM issue which would call `Task#failExternally` as a result, we could see the log "Attempting to fail task externally" in tm. 2. The source task would transform state from RUNNING to FAILED and then starts a canceler thread for canceling task, we could see log "Triggering cancellation of task" in tm. 3. When JM starts to cancel the source tasks, the rpc call `Task#cancelExecution` would find the task was already in FAILED state as above step 2, we could see log "Attempting to cancel task" in tm. At last all the five source tasks are not in terminal states from jm log, I guess the step 2 might not create canceler thread successfully, because the root failover was caused by OOM during creating native thread in step1, so it might exist possibilities that createing canceler thread is not successful as well in OOM case which is unstable. If so, the source task would not been interrupted at all, then it would not report to JM as well, but the state is already changed to FAILED before. For the other vertex tasks, it does not trigger `Task#failExternally` in step 1, and only receives the cancel rpc from JM in step 3. And I guess at this time later than the source period, the canceler thread could be created succesfully after some GCs, then these tasks could be canceled as reported to JM side. I think the key problem is under OOM case some behaviors are not within expectations, so it might bring problems. Maybe we should handle OOM error in extreme way like making TM exit to solve the potential issue. Best, Zhijiang -- From:Chesnay Schepler Send Time:2019年6月21日(星期五) 16:34 To:zhijiang ; Joshua Fan Cc:user ; Till Rohrmann Subject:Re: Maybe a flink bug. Job keeps in FAILING state The logs are attached to the initial mail. Echoing my thoughts from earlier: from the logs it looks as if the TM never even submits the terminal state RPC calls for several tasks to the JM. On 21/06/2019 10:30, zhijiang wrote: Hi Joshua, If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in CANCELED state on TM side, but in CANCELING state on JM side, then it might indicates the terminal state RPC was not received by JM. I am not sure whether the OOM would cause this issue happen resulting in unexpected behavior. In addition, you mentioned these tasks are still active after OOM and was called to cancel, so I am not sure what is the specific periods for your attached TM stack. I think it might provide help if you could provide corresponding TM log and JM log. From TM log it is easy to check the task final state. Best, Zhijiang -- From:Joshua Fan Send Time:2019年6月20日(星期四) 11:55 To:zhijiang Cc:user ; Till Rohrmann ; Chesnay Schepler Subject:Re: Maybe a flink bug. Job keeps in FAILING state zhijiang I did not capture the job ui, the topology is in FAILING state, but the persistentbolt subtasks as can be seen in the picture attached in first mail was all canceled, and the parsebolt subtasks as described before had one subtask FAILED, other subtasks CANCELED, but the source subtasks had one subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) CANCELING, not in a terminal state. The subtask status described above is in jm view, but in tm view, all of the source subtask was in FAILED, do not know why jm was not notify about this. As all of the failed status was triggered by a oom by the subtask can not create native thread when checkpointing, I also dumped the stack of the jvm, it shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are still active after it throwed a oom and was called
Re: Maybe a flink bug. Job keeps in FAILING state
Hi Zhijiang Thank you for your analysis. I agree with it. The solution may be to let tm exit like you mentioned when any type of oom occurs, because the flink has no control on a tm when a oom occurs. I fired a jira before, https://issues.apache.org/jira/browse/FLINK-12889. Don't know it is worth to fix. Thank you all. Yours sincerely Joshua On Fri, Jun 21, 2019 at 5:32 PM zhijiang wrote: > Thanks for the reminding @Chesnay Schepler . > > I just looked throught the related logs. Actually all the five > "Source: ServiceLog" tasks are not in terminal state on JM view, the > relevant processes are as follows: > > 1. The checkpoint in task causes OOM issue which would call > `Task#failExternally` as a result, we could see the log "Attempting to > fail task externally" in tm. > 2. The source task would transform state from RUNNING to FAILED and then > starts a canceler thread for canceling task, we could see log "Triggering > cancellation of task" in tm. > 3. When JM starts to cancel the source tasks, the rpc call > `Task#cancelExecution` would find the task was already in FAILED state as > above step 2, we could see log "Attempting to cancel task" in tm. > > At last all the five source tasks are not in terminal states from jm log, > I guess the step 2 might not create canceler thread successfully, because > the root failover was caused by OOM during creating native thread in step1, > so it might exist possibilities that createing canceler thread is not > successful as well in OOM case which is unstable. If so, the source task > would not been interrupted at all, then it would not report to JM as well, > but the state is already changed to FAILED before. > > For the other vertex tasks, it does not trigger `Task#failExternally` in > step 1, and only receives the cancel rpc from JM in step 3. And I guess at > this time later than the source period, the canceler thread could be > created succesfully after some GCs, then these tasks could be canceled as > reported to JM side. > > I think the key problem is under OOM case some behaviors are not within > expectations, so it might bring problems. Maybe we should handle OOM error > in extreme way like making TM exit to solve the potential issue. > > Best, > Zhijiang > > -- > From:Chesnay Schepler > Send Time:2019年6月21日(星期五) 16:34 > To:zhijiang ; Joshua Fan < > joshuafat...@gmail.com> > Cc:user ; Till Rohrmann > Subject:Re: Maybe a flink bug. Job keeps in FAILING state > > The logs are attached to the initial mail. > > Echoing my thoughts from earlier: from the logs it looks as if the TM > never even submits the terminal state RPC calls for several tasks to the JM. > > On 21/06/2019 10:30, zhijiang wrote: > Hi Joshua, > > If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really > in CANCELED state on TM side, but in CANCELING state on JM side, then it > might indicates the terminal state RPC was not received by JM. I am not > sure whether the OOM would cause this issue happen resulting in unexpected > behavior. > > In addition, you mentioned these tasks are still active after OOM and was > called to cancel, so I am not sure what is the specific periods for your > attached TM stack. I think it might provide help if you could provide > corresponding TM log and JM log. > From TM log it is easy to check the task final state. > > Best, > Zhijiang > -- > From:Joshua Fan > Send Time:2019年6月20日(星期四) 11:55 > To:zhijiang > Cc:user ; Till Rohrmann > ; Chesnay Schepler > > Subject:Re: Maybe a flink bug. Job keeps in FAILING state > > zhijiang > > I did not capture the job ui, the topology is in FAILING state, but the > persistentbolt subtasks as can be seen in the picture attached in first > mail was all canceled, and the parsebolt subtasks as described before had > one subtask FAILED, other subtasks CANCELED, but the source subtasks had > one subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask > 2/5,subtask 3/5,subtask 5/5) CANCELING, not in a terminal state. > > The subtask status described above is in jm view, but in tm view, all of > the source subtask was in FAILED, do not know why jm was not notify about > this. > > As all of the failed status was triggered by a oom by the subtask can not > create native thread when checkpointing, I also dumped the stack of the > jvm, it shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask > 5/5) are still active after it throwed a oom and was called to cancel . I > attached the jstack file in this email. > > Yours sincerely > Joshua > > On Wed, Jun 19, 2019 at 4:40 PM zhijiang > wrote: > As long as one task is in canceling state, then the job status might be > still in canceling state. > > @Joshua Do you confirm all of the tasks in topology were already in > terminal state such as failed or canceled? > > Best, > Zhijiang >
Re: Building some specific modules in flink
Hi Syed You could use 'mvn clean package -pl :flink-streaming-java_2.11 -DskipTests -am' to build flink-streaming-java and flink-runtime modules. If the 'already built binary' means the flink-dist-*.jar package, the former mvn command would not update the dist jar package. As far as I know, a quick solution is using `jar uf` [1] command to update the dist jar package with your changed classes. Otherwise, you need to build flink-dist module from scratch. [1] https://docs.oracle.com/javase/tutorial/deployment/jar/update.html Best Yun Tang From: syed Sent: Tuesday, June 25, 2019 9:14 To: user@flink.apache.org Subject: Building some specific modules in flink Hi; I am trying to modify some core functionalities of flink for my through understanding about flink. I already build the flink from source, now I am looking to build only a few modules which I have modified. Is this possible, or every time I have to build the flink in full (all modules). As it takes me about 30-35 minutes to build the flink in full. Specifically, I have modified some classes in *flink-streaming-java* and *flink-runtime* modules. I am looking to build only these two modules and integrate these into already build flink (all modules). I alrady tried using –pl option using mvn, it installs these modules but changes are not updated in already build binaries. Please guide me how can I do this. Kind regards; syed -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Building some specific modules in flink
You need to specify flink-dist in -pl. Module flink-dist will build the flink binary distribution. syed 于2019年6月25日周二 上午9:14写道: > Hi; > I am trying to modify some core functionalities of flink for my through > understanding about flink. I already build the flink from source, now I am > looking to build only a few modules which I have modified. Is this > possible, > or every time I have to build the flink in full (all modules). As it takes > me about 30-35 minutes to build the flink in full. > > Specifically, I have modified some classes in *flink-streaming-java* and > *flink-runtime* modules. I am looking to build only these two modules and > integrate these into already build flink (all modules). I alrady tried > using > –pl option using mvn, it installs these modules but changes are not updated > in already build binaries. > Please guide me how can I do this. > Kind regards; > syed > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > -- Best Regards Jeff Zhang
Error checkpointing to S3 like FS (EMC ECS)
Hi, I am using flink version 1.7.2 , I am trying to use S3 like object storage EMC ECS( https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm). Not all S3 apis are supported by EMC ESC according to this document. Here is my config s3.endpoint: SU73ECSG1P1d.***.COM s3.access-key: vdna_np_user security.ssl.rest.enabled: false web.timeout: 1 s3.secret-key: J*** I can access this bucket from s3cmd client. I set the state backend from my scala application env.setStateBackend(new FsStateBackend("s3://aip-featuretoolkit/checkpoints/")) However when I run my application I get this exception : ClientException: Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM: Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM 1336 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:255) 1337 at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498) 1338 at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345) 1339 at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) 1340 at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1173) 1341 at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1153) 1342 at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296) 1343 at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157) 1344 ... 10 more 1345 Caused by: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSClientIOException: doesBucketExist on aip-featuretoolkit: org.apache.flink.fs.s3base.shaded.co m.amazonaws.SdkClientException: Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM: Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM 1346 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177) 1347 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) 1348 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) 1349 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) 1350 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) 1351 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) 1352 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:372) 1353 at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:308) 1354 at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125) 1355 at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395) 1356 at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) 1357 at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) 1358 at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58) 1359 at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444) 1360 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249) 1361 ... 17 more 1362 Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.VISA.COM 1363 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114) 1364 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064) 1365 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) 1366 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) 1367 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) 1368 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) 1369 at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) 1370 at org.apache.flink.fs
Building some specific modules in flink
Hi; I am trying to modify some core functionalities of flink for my through understanding about flink. I already build the flink from source, now I am looking to build only a few modules which I have modified. Is this possible, or every time I have to build the flink in full (all modules). As it takes me about 30-35 minutes to build the flink in full. Specifically, I have modified some classes in *flink-streaming-java* and *flink-runtime* modules. I am looking to build only these two modules and integrate these into already build flink (all modules). I alrady tried using –pl option using mvn, it installs these modules but changes are not updated in already build binaries. Please guide me how can I do this. Kind regards; syed -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Unable to set S3 like object storage for state backend.
Hi Ken, Thanks for reaching out, I created a compliant bucket with name aip-featuretoolkit. I now get the exception "Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.***.COM: Name or service not known" from org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.class line 56. Here is my config from flink-conf.yaml file. s3.endpoint: SU73ECSG1P1d.***.COM s3.access-key: vdna_np_user security.ssl.rest.enabled: false web.timeout: 1 s3.secret-key: J*** I have not supplied the port in the config file. Does it internally use 9021 ? Also I am running my application as a different user not what is specified in s3.access-key. Does that matter ? Thanks, Vishwas On Thu, Jun 20, 2019 at 5:06 PM Ken Krugler wrote: > > Hi Vishwas, > > It might be that you’ve got a legacy bucket name (“aip_featuretoolkit”), as > AWS no longer allows bucket names to contain an underscore. > > I’m guessing that the Hadoop S3 code is trying to treat your path as a valid > URI, but the bucket name doesn’t conform, and thus you get the "null uri > host” issue. > > Could you try with a compliant bucket name? > > — Ken > > On Jun 20, 2019, at 2:46 PM, Vishwas Siravara wrote: > > Hi, > I am using flink version 1.7.2 , I am trying to use S3 like object > storage EMC ECS( > https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) . > > I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for > s3 filesystem and I have placed it under the lib folder and is > available to flink in its class path. > > My flink-conf.yaml looks like this : > > s3.endpoint: SU73ECSG1P1d.***.COM:9021 > s3.access-key: vdna_np_user > security.ssl.rest.enabled: false > web.timeout: 1 > s3.secret-key: J*** > > And my code for statebackend is like this : > > env.setStateBackend(new > FsStateBackend("s3://aip_featuretoolkit/checkpoints/")) > > I have a bucket called aip_featuretoolkit in my s3 instance. I can > connect to s3 form s3 command line utilities. However I cannot > checkpoint with this configuration in flink. I get the following error > message > > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Could not > retrieve JobResult. > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) > at com.visa.flink.job.DruidStreamJob$.runJob(DruidStreamJob.scala:62) > at com.visa.flink.cli.CliFlinkDruid.run(CliFlinkDruid.scala:19) > at com.visa.flink.cli.Main$.main(Main.scala:22) > at com.visa.flink.cli.Main.main(Main.scala) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: > Failed to submit job. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set > up JobManager > at > org.apache.flink.util.function.CheckedSupplier.lamb
unsubscribe
Process Function's timers "postponing"
Dear Community, I am using Flink (processing-time) timers along with a Process Function. What I would like to do is to "postpone" eventually registered timers for the given key: I would like to do it since I might process plenty of events in a row (think about it as a session) so that I will able to trigger the computation "just after" this session somehow stops. I wondered about deleting eventual existing timers but AFAIU I need to know the previous timer triggering time, which I guess is not possible for me since I use processing-time timers. I read also [1] but I am not really able to understand if it comes handy to me; for instance, I don't understand what "Since Flink maintains only one timer per key and timestamp...". Does this imply that a new PT timer will automatically overwrite an eventual previously existing one? Thank you for your precious help, [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing -- *Andrea Spina* Head of R&D @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Re: Linkage Error RocksDB and flink-1.6.4
Hi Shu Su, the first point exactly pinpointed the issue I bumped into. I forgot to put that dependency to "provided". Thank you! Il giorno lun 24 giu 2019 alle ore 05:35 Shu Su ha scritto: > Hi Andrea > > Actually It’s caused by Flink’s ClassLoader. It’s because flink use > parent Classloader to load jar first and then you use it in your user’s > code, then user-code classloader will load it again so it raised the error. > There are two solutions. > 1. Add scope “provided” to maven pom.xml > > org.apache.flink > flink-statebackend-rocksdb_2.11 > ${flink_version} > *provided* > > 2. Set this classloader.resolve-order: parent-first in flink-conf.yml > > Hope this will help you. > > Thanks, > Simon > On 06/24/2019 11:27,Yun Tang wrote: > > Hi Andrea > > Since I have not written Scala for a while, I wonder why you need to > instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions > on JM side. As far as I can see, you could instantiate your on your TM side > like code: > > val rocksdbConfig = new OptionsFactory() { > override def createDBOptions(currentOptions: DBOptions): DBOptions = > currentOptions.setIncreaseParallelism(properties.threadNo) > > override def createColumnOptions(currentOptions: ColumnFamilyOptions): > ColumnFamilyOptions = > > currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize)) > } > > You just need to serialize the properties via closure to TMs. Hope this could > help you. > > Best > Yun Tang > -- > *From:* Andrea Spina > *Sent:* Monday, June 24, 2019 2:20 > *To:* user > *Subject:* Linkage Error RocksDB and flink-1.6.4 > > Dear community, > I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. > At the job Startp the following exception happens (it's recorded by the Job > Manager). > > > > > > > > > > > > *Caused by: java.lang.LinkageError: loader constraint violation: loader > (instance of > org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) > previously initiated loading for a different type with name > "org/rocksdb/DBOptions" at > java.lang.ClassLoader.defineClass1(Native Method) at > java.lang.ClassLoader.defineClass(ClassLoader.java:763) at > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) > at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at > java.net.URLClassLoader$1.run(URLClassLoader.java:369) at > java.net.URLClassLoader$1.run(URLClassLoader.java:363) at > java.security.AccessController.doPrivileged(Native Method) at > java.net.URLClassLoader.findClass(URLClassLoader.java:362) at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)* > > For this job, I programmatically set some RocksDB options by using the > code appended below. Anybody can help with this? Thank you so much, > Andrea > > > import org.apache.flink.configuration.MemorySize > import org.apache.flink.contrib.streaming.state.{OptionsFactory, > PredefinedOptions, RocksDBStateBackend} > import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions} > > object ConfigurableRocksDB { > > lazy val columnOptions = new ColumnFamilyOptions() with Serializable > lazy val tableConfig = new BlockBasedTableConfig() with Serializable > lazy val dbOptions = new DBOptions() with Serializable > > def configureStateBackendRocksDB(properties: FlinkDeployment): > RocksDBStateBackend = { > properties.threadNo.foreach(dbOptions.setIncreaseParallelism) > > properties.blockSize.foreach(bs => > tableConfig.setBlockSize(MemorySize.parseBytes(bs))) > properties.cacheSize.foreach(cs => > tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs))) > properties.cacheIndicesAndFilters.foreach(cif => if (cif) > tableConfig.cacheIndexAndFilterBlocks()) > properties.writeBufferSize.foreach(wbs => > columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs))) > > columnOptions.setTableFormatConfig(tableConfig) > properties.writeBufferToMerge.foreach(bm => > columnOptions.setMinWriteBufferNumberToMerge(bm)) > properties.writeBufferCount.foreach(bc => > columnOptions.setMaxWriteBufferNumber(bc)) > properties.optimizeFilterForHits.foreach(op => if (op) > columnOptions.optimizeFiltersForHits()) > > val rocksdbConfig = new OptionsFactory() { > override def createDBOptions(currentOptions: DBOptions): DBOptions >= dbOptions > override def createColumnOptions(currentOptions: ColumnFamilyOptions): > ColumnFamilyOptions = columnOptions > } > > val stateBE = > new RocksDBStateBackend(properties.checkpo
Re: Flink Kafka consumer with low latency requirement
I posted my related observation here in a separated thread. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-making-synchronize-call-might-choke-the-whole-pipeline-tc28383.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink Kafka consumer with low latency requirement
private static void doWork(long tid) throws InterruptedException { if (!sortedTid.contains(tid)) { sortedTid.add(tid); } // simulate a straggler, make the thread with the lowest tid a slow processor if (sortedTid.first() == tid) { if (counter++ == 1000){ Thread.sleep(60,000); } Thread.sleep(20); } else { Thread.sleep(20); } } Just for testing purpose, the thread with the lowest tid sleeps 60s when the counter reaches 1000. Will 'sleep' causes any issues? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink Kafka consumer with low latency requirement
Hi, What kind of function do you use to implement the operator that has the blocking call? Did you have a look at the AsyncIO operator? It was designed for exactly such use cases. It issues multiple asynchronous requests to an external service and waits for the response. Best, Fabian Am Mo., 24. Juni 2019 um 17:01 Uhr schrieb xwang355 : > Fabian, > > Does the above stack trace looks like a deadlock? > > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:539) > > - locked <0x0007baf84040> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508) > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: Flink Kafka consumer with low latency requirement
Fabian, Does the above stack trace looks like a deadlock? at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:539) - locked <0x0007baf84040> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink Kafka consumer with low latency requirement
Fabian, Thank you for replying. If I understand your previous comment correctly, I setup up a consumer with parallelism 1 and connect a worker task with parallelism 2. If worker thread one is making a block call and stuck for 60s, the consumer thread should continue fetching from the partition and feeding thread two. >From my reading of Flink documentation, if checkpointing is enabled, the consumer should commit its own internal state back to Kafka to show progress to external monitoring tool. If that`s the case, during the 60s when thread one is stuck, checkpoint should all succeed, thread two continuing chucking along merrily. Even though the highest offset committed is the one less than the offset hold by thread 1. After 60s, I should see a huge jump from the monitoring tool due to the fact the thread 1 has released the offset and all offsets consumed by thread 2 during the 60s can be committed. However, what I have observed is that the as soon as thread one get stuck, checkpointing is choked, consumer thread stopped feeding thread two and the whole pipeline became stagnant. Could you please help me understand this behavior. Thanks again. Ben -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Unexpected behavior from interval join in Flink
Ah, that's great! Thanks for letting us know :-) Am Mo., 24. Juni 2019 um 11:33 Uhr schrieb Wouter Zorgdrager < w.d.zorgdra...@tudelft.nl>: > Hi Fabian, > > Thanks for your reply. I managed to resolve this issue. Actually this > behavior was not so unexpected, I messed up using xStream as a 'base' while > I needed to use yStream as a 'base'. I.e. yStream.element - 60 min <= > xStream.element <= yStream.element + 30 min. Interchanging both datastreams > fixed this issue. > > Thanks anyways. > > Cheers, Wouter > > > > Op ma 24 jun. 2019 om 11:22 schreef Fabian Hueske : > >> Hi Wouter, >> >> Not sure what is going wrong there, but something that you could try is >> to use a custom watemark assigner and always return a watermark of 0. >> When the source finished serving the watermarks, it emits a final >> Long.MAX_VALUE watermark. >> Hence the join should consume all events and store them in state. When >> both sources are finished, it would start to join the data and clean up the >> state. >> This test would show if there are any issue with late data. >> >> Best, Fabian >> >> Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager < >> w.d.zorgdra...@tudelft.nl>: >> >>> Anyone some leads on this issue? Have been looking into the >>> IntervalJoinOperator code, but that didn't really help. My intuition is >>> that it is rejected because of lateness, however that still confuses me >>> since I'm sure that both datastreams have monotonic increasing timestamps. >>> >>> Thx, Wouter >>> >>> Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager < >>> w.d.zorgdra...@tudelft.nl>: >>> Hi all, I'm experiencing some unexpected behavior using an interval join in Flink. I'm dealing with two data sets, lets call them X and Y. They are finite (10k elements) but I interpret them as a DataStream. The data needs to be joined for enrichment purposes. I use event time and I know (because I generated the data myself) that the timestamp of an element Y is always between -60 minutes and +30 minutes of the element with the same key in set X. Both datasets are in-order (in terms of timestamps), equal in size, share a common key and parallelism is set to 1 throughout the whole program. The code to join looks something like this: xStream .assignAscendingTimestamps(_.date.getTime) .keyBy(_.commonKey) .intervalJoin( yStream .assignAscendingTimestamps(_.date.getTime) .keyBy(_.commonKey)) .between(Time.minutes(-60), Time.minutes(30)) .process(new ProcessJoinFunction[X, Y, String] { override def processElement( left: X, right: Y, ctx: ProcessJoinFunction[X, Y, String]#Context, out: Collector[String]): Unit = { out.collect(left + ":" + right) } However, about 30% percent of the data is not joined. Is there a proper way to debug this? For instance, in windows you can side-output late data. Is there a possibility to side-output unjoinable data? Thx a lot, Wouter
Re: Unexpected behavior from interval join in Flink
Hi Fabian, Thanks for your reply. I managed to resolve this issue. Actually this behavior was not so unexpected, I messed up using xStream as a 'base' while I needed to use yStream as a 'base'. I.e. yStream.element - 60 min <= xStream.element <= yStream.element + 30 min. Interchanging both datastreams fixed this issue. Thanks anyways. Cheers, Wouter Op ma 24 jun. 2019 om 11:22 schreef Fabian Hueske : > Hi Wouter, > > Not sure what is going wrong there, but something that you could try is to > use a custom watemark assigner and always return a watermark of 0. > When the source finished serving the watermarks, it emits a final > Long.MAX_VALUE watermark. > Hence the join should consume all events and store them in state. When > both sources are finished, it would start to join the data and clean up the > state. > This test would show if there are any issue with late data. > > Best, Fabian > > Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager < > w.d.zorgdra...@tudelft.nl>: > >> Anyone some leads on this issue? Have been looking into the >> IntervalJoinOperator code, but that didn't really help. My intuition is >> that it is rejected because of lateness, however that still confuses me >> since I'm sure that both datastreams have monotonic increasing timestamps. >> >> Thx, Wouter >> >> Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager < >> w.d.zorgdra...@tudelft.nl>: >> >>> Hi all, >>> >>> I'm experiencing some unexpected behavior using an interval join in >>> Flink. >>> I'm dealing with two data sets, lets call them X and Y. They are finite >>> (10k elements) but I interpret them as a DataStream. The data needs to be >>> joined for enrichment purposes. I use event time and I know (because I >>> generated the data myself) that the timestamp of an element Y is always >>> between -60 minutes and +30 minutes of the element with the same key in set >>> X. Both datasets are in-order (in terms of timestamps), equal in size, >>> share a common key and parallelism is set to 1 throughout the whole program. >>> >>> The code to join looks something like this: >>> >>> xStream >>> .assignAscendingTimestamps(_.date.getTime) >>> .keyBy(_.commonKey) >>> .intervalJoin( >>> yStream >>> .assignAscendingTimestamps(_.date.getTime) >>> .keyBy(_.commonKey)) >>> .between(Time.minutes(-60), Time.minutes(30)) >>> .process(new ProcessJoinFunction[X, Y, String] { >>> override def processElement( >>> left: X, >>> right: Y, >>> ctx: ProcessJoinFunction[X, Y, String]#Context, >>> out: Collector[String]): Unit = { >>> >>> out.collect(left + ":" + right) >>> } >>> >>> >>> However, about 30% percent of the data is not joined. Is there a proper >>> way to debug this? For instance, in windows you can side-output late data. >>> Is there a possibility to side-output unjoinable data? >>> >>> Thx a lot, >>> Wouter >>> >>> >>>
Re: Unexpected behavior from interval join in Flink
Hi Wouter, Not sure what is going wrong there, but something that you could try is to use a custom watemark assigner and always return a watermark of 0. When the source finished serving the watermarks, it emits a final Long.MAX_VALUE watermark. Hence the join should consume all events and store them in state. When both sources are finished, it would start to join the data and clean up the state. This test would show if there are any issue with late data. Best, Fabian Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager < w.d.zorgdra...@tudelft.nl>: > Anyone some leads on this issue? Have been looking into the > IntervalJoinOperator code, but that didn't really help. My intuition is > that it is rejected because of lateness, however that still confuses me > since I'm sure that both datastreams have monotonic increasing timestamps. > > Thx, Wouter > > Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager < > w.d.zorgdra...@tudelft.nl>: > >> Hi all, >> >> I'm experiencing some unexpected behavior using an interval join in Flink. >> I'm dealing with two data sets, lets call them X and Y. They are finite >> (10k elements) but I interpret them as a DataStream. The data needs to be >> joined for enrichment purposes. I use event time and I know (because I >> generated the data myself) that the timestamp of an element Y is always >> between -60 minutes and +30 minutes of the element with the same key in set >> X. Both datasets are in-order (in terms of timestamps), equal in size, >> share a common key and parallelism is set to 1 throughout the whole program. >> >> The code to join looks something like this: >> >> xStream >> .assignAscendingTimestamps(_.date.getTime) >> .keyBy(_.commonKey) >> .intervalJoin( >> yStream >> .assignAscendingTimestamps(_.date.getTime) >> .keyBy(_.commonKey)) >> .between(Time.minutes(-60), Time.minutes(30)) >> .process(new ProcessJoinFunction[X, Y, String] { >> override def processElement( >> left: X, >> right: Y, >> ctx: ProcessJoinFunction[X, Y, String]#Context, >> out: Collector[String]): Unit = { >> >> out.collect(left + ":" + right) >> } >> >> >> However, about 30% percent of the data is not joined. Is there a proper >> way to debug this? For instance, in windows you can side-output late data. >> Is there a possibility to side-output unjoinable data? >> >> Thx a lot, >> Wouter >> >> >>
Re: Flink Kafka consumer with low latency requirement
Hi Ben, Flink's Kafka consumers track their progress independent of any worker. They keep track of the reading offset for themselves (committing progress to Kafka is optional and only necessary to have progress monitoring in Kafka's metrics). As soon as a consumer reads and forwards an event, it is considered to be read. This means, the progress of the downstream worker does not influence the progress tracking at all. In case of a topic with a single partition, you can use a consumer with parallelism 1 and connect a worker task with a higher parallelism to it. The single consumer task will send the read events round-robin to the worker tasks. Best, Fabian Am Fr., 21. Juni 2019 um 05:48 Uhr schrieb wang xuchen : > > Dear Flink experts, > > I am experimenting Flink for a use case where there is a tight latency > requirements. > > A stackoverflow article suggests that I can use setParallism(n) to process > a Kafka partition in a multi-threaded way. My understanding is there is > still one kafka consumer per partition, but by using setParallelism, I can > spin up multiple worker threads to process the messages read from the > consumer. > > And according to Fabian`s comments in this link: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-Flink-Kafka-connector-has-max-pending-offsets-concept-td28119.html > Flink is able to manage the offset correctly (commit in the right order). > > Here is my questions, let`s say there is a Kafka topic with only one > partition, and I setup a consumer with setParallism(2). Hypothetically, > worker threads call out to a REST service which may get slow or stuck > periodically. If I want to make sure that the consumer overall is making > progress even in face of a 'slow woker'. In other words, I`d like to have > multiple pending but uncommitted offsets by the fast worker even when the > other worker is stuck. Is there such a knob to tune in Flink? > > From my own experiment, I use Kafka consume group tool to to monitor the > offset lag, soon as one worker thread is stuck, the other cannot make any > progress either. I really want the fast worker still progress to certain > extend. For this use case, exactly once processing is not required. > > Thanks for helping. > Ben > > >