Re: question on dataSource.collect() on reading states from a savepoint file
Thanks Bastien. I will check it out. Antonio. On Thu, Feb 10, 2022 at 11:59 AM bastien dine wrote: > I haven't used s3 with Flink, but according to this doc : > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/ > You can setup pretty easily s3 and use it with s3://path/to/your/file with > a write sink > The page talk about DataStream but it should work with DataSet ( > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/dataset/overview/#data-sinks > ) > > Maybe someone else will have more information about s3 dataset sink > > Regards, > > > Le jeu. 10 févr. 2022 à 20:52, Antonio Si a écrit : > >> Thanks Bastien. Can you point to an example of using a sink as we are >> planning to write to S3? >> >> Thanks again for your help. >> >> Antonio. >> >> On Thu, Feb 10, 2022 at 11:49 AM bastien dine >> wrote: >> >>> Hello Antonio, >>> >>> .collect() method should be use with caution as it's collecting the >>> DataSet (multiple partitions on multiple TM) into a List single list on JM >>> (so in memory) >>> Unless you have a lot of RAM, you can not use it this way and you >>> probably should not >>> I recommend you to use a sink to print it into a formatted file instead >>> (like CSV one) or if it's too big, into something splittable >>> >>> Regards, >>> Bastien >>> >>> -- >>> >>> Bastien DINE >>> Data Architect / Software Engineer / Sysadmin >>> bastiendine.io >>> >>> >>> Le jeu. 10 févr. 2022 à 20:32, Antonio Si a >>> écrit : >>> >>>> Hi, >>>> >>>> I am using the stateful processing api to read the states from a >>>> savepoint file. >>>> It works fine when the state size is small, but when the state size is >>>> larger, around 11GB, I am getting an OOM. I think it happens when it is >>>> doing a dataSource.collect() to obtain the states. The stackTrace is copied >>>> at the end of the message. >>>> >>>> Any suggestions or hints would be very helpful. >>>> >>>> Thanks in advance. >>>> >>>> Antonio. >>>> >>>> java.lang.OutOfMemoryError: null >>>> at >>>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) >>>> ~[?:1.8.0_282] >>>> at >>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) >>>> ~[?:1.8.0_282] >>>> at >>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >>>> ~[?:1.8.0_282] >>>> at >>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) >>>> ~[?:1.8.0_282] >>>> at >>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) >>>> ~[?:1.8.0_282] >>>> at >>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) >>>> ~[?:1.8.0_282] >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) >>>> ~[?:1.8.0_282] >>>> at >>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>> ~[?:1.8.0_282] >>>> at >>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) >>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >>>> at >>>> org.apache.flink.util.SerializedValue.(SerializedValue.java:62) >>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >>>> at >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806) >>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >>>> at >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795) >>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >>>> at >>>> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) >>>> ~[?:1.8.0_282] >>>> at >>>> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) >>>> ~[?:1.8.0_282] >>>> at >>>> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699) >>>> ~[?:1.8.0_282] >>&g
Re: question on dataSource.collect() on reading states from a savepoint file
Thanks Bastien. Can you point to an example of using a sink as we are planning to write to S3? Thanks again for your help. Antonio. On Thu, Feb 10, 2022 at 11:49 AM bastien dine wrote: > Hello Antonio, > > .collect() method should be use with caution as it's collecting the > DataSet (multiple partitions on multiple TM) into a List single list on JM > (so in memory) > Unless you have a lot of RAM, you can not use it this way and you probably > should not > I recommend you to use a sink to print it into a formatted file instead > (like CSV one) or if it's too big, into something splittable > > Regards, > Bastien > > -- > > Bastien DINE > Data Architect / Software Engineer / Sysadmin > bastiendine.io > > > Le jeu. 10 févr. 2022 à 20:32, Antonio Si a écrit : > >> Hi, >> >> I am using the stateful processing api to read the states from a >> savepoint file. >> It works fine when the state size is small, but when the state size is >> larger, around 11GB, I am getting an OOM. I think it happens when it is >> doing a dataSource.collect() to obtain the states. The stackTrace is copied >> at the end of the message. >> >> Any suggestions or hints would be very helpful. >> >> Thanks in advance. >> >> Antonio. >> >> java.lang.OutOfMemoryError: null >> at >> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) >> ~[?:1.8.0_282] >> at >> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) >> ~[?:1.8.0_282] >> at >> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >> ~[?:1.8.0_282] >> at >> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) >> ~[?:1.8.0_282] >> at >> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) >> ~[?:1.8.0_282] >> at >> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) >> ~[?:1.8.0_282] >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) >> ~[?:1.8.0_282] >> at >> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> ~[?:1.8.0_282] >> at >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> at >> org.apache.flink.util.SerializedValue.(SerializedValue.java:62) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> at >> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) >> ~[?:1.8.0_282] >> at >> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) >> ~[?:1.8.0_282] >> at >> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699) >> ~[?:1.8.0_282] >> at >> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) >> ~[?:1.8.0_282] >> at >> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) >> ~[?:1.8.0_282] >> at >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >> ~[?:1.8.0_282] >> at >> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >> ~[?:1.8.0_282] >> at >> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) >> ~[?:1.8.0_282] >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> at >> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> at >> org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> at >> org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1085) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] >> at >> org.apache.flink.runtime.jobmaster.JobMaster.access$2200(JobMaster.java:131) >> ~[flink-dist_2.11-1.12.2.jar:1.12.2] &
question on dataSource.collect() on reading states from a savepoint file
Hi, I am using the stateful processing api to read the states from a savepoint file. It works fine when the state size is small, but when the state size is larger, around 11GB, I am getting an OOM. I think it happens when it is doing a dataSource.collect() to obtain the states. The stackTrace is copied at the end of the message. Any suggestions or hints would be very helpful. Thanks in advance. Antonio. java.lang.OutOfMemoryError: null at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) ~[?:1.8.0_282] at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) ~[?:1.8.0_282] at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) ~[?:1.8.0_282] at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) ~[?:1.8.0_282] at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) ~[?:1.8.0_282] at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) ~[?:1.8.0_282] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) ~[?:1.8.0_282] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[?:1.8.0_282] at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.util.SerializedValue.(SerializedValue.java:62) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) ~[?:1.8.0_282] at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) ~[?:1.8.0_282] at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699) ~[?:1.8.0_282] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_282] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_282] at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_282] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_282] at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) ~[?:1.8.0_282] at org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1085) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.jobmaster.JobMaster.access$2200(JobMaster.java:131) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1356) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.12.2.jar:1