I haven't used s3 with Flink, but according to this doc :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/
You can setup pretty easily s3 and use it with s3://path/to/your/file with
a write sink
The page talk about DataStream but it should work with DataSet (
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/dataset/overview/#data-sinks
)

Maybe someone else will have more information about s3 dataset sink

Regards,


Le jeu. 10 févr. 2022 à 20:52, Antonio Si <antonio...@gmail.com> a écrit :

> Thanks Bastien. Can you point to an example of using a sink as we are
> planning to write to S3?
>
> Thanks again for your help.
>
> Antonio.
>
> On Thu, Feb 10, 2022 at 11:49 AM bastien dine <bastien.d...@gmail.com>
> wrote:
>
>> Hello Antonio,
>>
>> .collect() method should be use with caution as it's collecting the
>> DataSet (multiple partitions on multiple TM) into a List single list on JM
>> (so in memory)
>> Unless you have a lot of RAM, you can not use it this way and you
>> probably should not
>> I recommend you to use a sink to print it into a formatted file instead
>> (like CSV one) or if it's too big, into something splittable
>>
>> Regards,
>> Bastien
>>
>> ------------------
>>
>> Bastien DINE
>> Data Architect / Software Engineer / Sysadmin
>> bastiendine.io
>>
>>
>> Le jeu. 10 févr. 2022 à 20:32, Antonio Si <antonio...@gmail.com> a
>> écrit :
>>
>>> Hi,
>>>
>>> I am using the stateful processing api to read the states from a
>>> savepoint file.
>>> It works fine when the state size is small, but when the state size is
>>> larger, around 11GB, I am getting an OOM. I think it happens when it is
>>> doing a dataSource.collect() to obtain the states. The stackTrace is copied
>>> at the end of the message.
>>>
>>> Any suggestions or hints would be very helpful.
>>>
>>> Thanks in advance.
>>>
>>> Antonio.
>>>
>>> java.lang.OutOfMemoryError: null
>>>         at
>>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> ~[?:1.8.0_282]
>>>         at
>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>> ~[?:1.8.0_282]
>>>         at
>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>>> ~[?:1.8.0_282]
>>>         at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1085)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> org.apache.flink.runtime.jobmaster.JobMaster.access$2200(JobMaster.java:131)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1356)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>> [flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> [flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> [flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> [flink-dist_2.11-1.12.2.jar:1.12.2]
>>>         at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> [flink-dist_2.11-1.12.2.jar:1.12.2]
>>>
>>

Reply via email to