unsubscribe

2022-12-11 Thread Ayush
unsubscribe


Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-09 Thread Ayush Chauhan
My usecase is that as soon as the avro message version is changed, I want
to reload the job graph so that I can update the downstream iceberg table.

Iceberg FlinkSink take table schema during the job start and cannot be
updated during runtime. So, I want to trigger graceful shutdown and restart
the job.

Can I reload the job graph to achieve that?



On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise  wrote:

> Hi Ayush,
>
> DeserializationSchema.isEndOfStream was only ever supported by Kafka. For
> new Kafka source, the recommended way is to use the bounded mode like this
>
> KafkaSource source =
> KafkaSource.builder()
> ...
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setBounded(OffsetsInitializer.latest())
> .build();
>
> You can implement your own OffsetsInitializer or use a provided one.
>
> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan  wrote:
>
>> There is no way to end the kafka stream from the deserializer.
>>
>> When would you want to end the stream? Could you explain why you need to
>> end the kafka stream without using the offset?
>>
>> Ayush Chauhan  于2021年12月8日周三 15:29写道:
>>
>>>
>>> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>>>
>>>
>>>
>>> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger 
>>> wrote:
>>>
>>>> Hi Ayush,
>>>>
>>>> I couldn't find the documentation you've mentioned. Can you send me a
>>>> link to it?
>>>>
>>>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Can you please let me know the alternatives of isEndOfStream() as now
>>>>> according to docs this method will no longer be used to determine the end
>>>>> of the stream.
>>>>>
>>>>> --
>>>>>  Ayush Chauhan
>>>>>  Data Platform
>>>>>  [image: mobile-icon]  +91 9990747111
>>>>>
>>>>>
>>>>> This email is intended only for the person or the entity to whom it is
>>>>> addressed. If you are not the intended recipient, please delete this email
>>>>> and contact the sender.
>>>>>
>>>>
>>>
>>> --
>>>  Ayush Chauhan
>>>  Data Platform
>>>  [image: mobile-icon]  +91 9990747111
>>>
>>>
>>> This email is intended only for the person or the entity to whom it is
>>> addressed. If you are not the intended recipient, please delete this email
>>> and contact the sender.
>>>
>>

-- 
 Ayush Chauhan
 Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-07 Thread Ayush Chauhan
https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69



On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger  wrote:

> Hi Ayush,
>
> I couldn't find the documentation you've mentioned. Can you send me a link
> to it?
>
> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan 
> wrote:
>
>> Hi,
>>
>> Can you please let me know the alternatives of isEndOfStream() as now
>> according to docs this method will no longer be used to determine the end
>> of the stream.
>>
>> --
>>  Ayush Chauhan
>>  Data Platform
>>  [image: mobile-icon]  +91 9990747111
>>
>>
>> This email is intended only for the person or the entity to whom it is
>> addressed. If you are not the intended recipient, please delete this email
>> and contact the sender.
>>
>

-- 
 Ayush Chauhan
 Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-07 Thread Ayush Chauhan
Hi,

Can you please let me know the alternatives of isEndOfStream() as now
according to docs this method will no longer be used to determine the end
of the stream.

-- 
 Ayush Chauhan
 Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



Flink-13 table API wrongly adding padding for IN clause elements

2021-11-29 Thread Ayush Chauhan
Hi,

In Flink 13, while using filter/where condition in table api I am getting
wrong results. Upon debugging I found that it is adding padding to the IN
clause elements according to the first element in the IN clause.

Here's the sample code

tEnv.toAppendStream(input.where($("ename").in("O2CartPageLoaded","OrderPlaced","O2MenuViewed","opened_app"))
.select($("ename")), Utils.getTypeInfo("ename:string")).print();

Screenshot of hashset
[image: Screenshot 2021-11-29 at 16.24.59.png]

If I change the IN clasue element order, then the padding also changes

tEnv.toAppendStream(input.where($("ename").in("OrderPlaced","O2CartPageLoaded","O2MenuViewed","opened_app"))
.select($("ename")), Utils.getTypeInfo("ename:string")).print();

Screenshot of hashset
[image: Screenshot 2021-11-29 at 16.23.20.png]

--
 Ayush Chauhan
 Data Platform

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



Re: Flink CDC job getting failed due to G1 old gc

2021-08-01 Thread Ayush Chauhan
Hi Leonard,

I am using flink 1.11.2 and using debezium-json to read CDC data generated
by debezium.

For each table, I convert the Kafka dynamic table to a retract stream and
finally that stream is converted to DataStream. Here's the sample
function

private DataStream getDataStream(String sql) {
LOGGER.debug(sql);
Table out = tEnv.sqlQuery(sql);
DataStream> dsRow = tEnv.toRetractStream(out,
Row.class);
return dsRow.map((MapFunction, RowData>) t2 -> {
RowKind rowKind = t2.f1.getKind();
GenericRowData genericRowData = new GenericRowData(rowKind,
t2.f1.getArity());
for (int pos = 0; pos < t2.f1.getArity(); pos = pos + 1) {
Object object = t2.f1.getField(pos);
Object convertedType;
if (object instanceof String) {
convertedType =
RowDataUtil.convertConstant(Types.StringType.get(), object);
} else if (object instanceof LocalDateTime) {
convertedType =
TimestampData.fromLocalDateTime((LocalDateTime) object);
} else {
convertedType = object;
}
genericRowData.setField(pos, convertedType);
}
return genericRowData;
});
}


I then pass this datastream to the Flink sink.

FlinkSink.forRowData(rowDataDataStream)
.table(icebergTable)

.tableSchema(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergTable.schema(
.tableLoader(tableLoader)
.equalityFieldColumns(tableConfig.getEqualityColumns())
.build();


Please let me know if you need some other information too


On Mon, Aug 2, 2021 at 7:48 AM Leonard Xu  wrote:

> Hi, Ayush
>
> Thanks for the detailed description.
>
> Before analyze the issue, I have two questions that which Flink and Flink
> CDC version are you using?  Is Flink CDC used in SQL or DataStream ?
> That’s helpful if you can post you Flink CDC connector parameters.
>
> Best,
> Leonard
>
> 在 2021年7月29日,18:57,Ayush Chauhan  写道:
>
> Hi all,
>
> We are using Flink + iceberg to consume CDC data. We have combined all the
> tables of a single DB in one job. Our job is frequently running into GC
> issues. Earlier it was running default on parallel GC and I have changed it
> to G1GC. G1GC did bring some improvements but still, I am facing the same
> problem.
>
> Following are the params on my job - -ytm 5120m -yjm 1024m -yD
> env.java.opts="-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35"
>
> This job is running CDC ingestion for 17 tables with a parallelism of 1
> and throughput is around ~10k messages for the 10minutes checkpointing
> interval
>
> I am attaching a part of the thread dump in this email.
>
> During old GC, the job gets stuck and its checkpointing which is normally
> under 1 sec gets increased exponentially to the timeout threshold. Job
> either get failed due to checkpointing timeout or it failed to get the
> heartbeat of the task manager
>
> 
> 
>
>
> --
>  Ayush Chauhan
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
> 
>
>
>

-- 
 Ayush Chauhan
 Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



Re: Flink CDC job getting failed due to G1 old gc

2021-07-30 Thread Ayush Chauhan
I am using RocksDB as the state backend. My pipeline checkpoint size is
hardly ~100kb.

I will add gc and heap dump config and will let you know of any findings

Right now I have doubts that there is some memory leak either in flink cdc
code or in iceberg sink https://iceberg.apache.org/flink/#overwrite-data

On Fri, Jul 30, 2021 at 12:31 PM David Morávek  wrote:

> Hi Ayush,
>
> This would signal some of your task managers is running out of memory,
> which would cause frequent old gen GC, because one cycle is not able to
> free up enough memory.
>
> What state backend are you using? If in-memory, off-loading state to
> RocksDB might help.
>
> Anyway, the general approach here would be the same as for any Java
> application:
> - You can enable GC logs and validate this really happens (more
> lightweight check would be just using something like `jstat -gccause 
> ...`.
> - Take a heap dump of the affected TM to see what exactly is consuming
> your memory (eclipse MAT is fairly good with large heaps).
>
> Best,
> D.
>
>

-- 
 Ayush Chauhan
 Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



Re: Debezium CDC | OOM

2021-04-22 Thread Ayush Chauhan
Hi Matthias,

I am using RocksDB as a state backend. I think the iceberg sink is not able
to propagate back pressure to the source which is resulting in OOM for my
CDC pipeline.
Please refer to this - https://github.com/apache/iceberg/issues/2504



On Thu, Apr 22, 2021 at 8:44 PM Matthias Pohl 
wrote:

> Hi Ayush,
> Which state backend have you configured [1]? Have you considered trying
> out RocksDB [2]? RocksDB might help with persisting at least keyed state.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#choose-the-right-state-backend
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
>
> On Thu, Apr 22, 2021 at 7:52 AM Ayush Chauhan 
> wrote:
>
>> Hi,
>> I am using flink cdc to stream CDC changes in an iceberg table. When I
>> first run the flink job for a topic which has all the data for a table, it
>> get out of heap memory as flink try to load all the data during my 15mins
>> checkpointing interval. Right now, only solution I have is to pass *-ytm
>> 8192 -yjm 2048m* for a table with 10M rows and then reduce it after
>> flink has consumed all the data. Is there a way to tell flink cdc code to
>> trigger checkpoint or throttle the consumption speed(I think backpressure
>> should have handled this)?
>>
>> --
>>  Ayush Chauhan
>>  Software Engineer | Data Platform
>>  [image: mobile-icon]  +91 9990747111
>>
>>
>> This email is intended only for the person or the entity to whom it is
>> addressed. If you are not the intended recipient, please delete this email
>> and contact the sender.
>>
>

-- 
 Ayush Chauhan
 Software Engineer | Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



Debezium CDC | OOM

2021-04-21 Thread Ayush Chauhan
Hi,
I am using flink cdc to stream CDC changes in an iceberg table. When I
first run the flink job for a topic which has all the data for a table, it
get out of heap memory as flink try to load all the data during my 15mins
checkpointing interval. Right now, only solution I have is to pass *-ytm
8192 -yjm 2048m* for a table with 10M rows and then reduce it after flink
has consumed all the data. Is there a way to tell flink cdc code to trigger
checkpoint or throttle the consumption speed(I think backpressure should
have handled this)?

-- 
 Ayush Chauhan
 Software Engineer | Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



Re: Streaming data to parquet

2020-09-11 Thread Ayush Verma
Hi,

Looking at the problem broadly, file size is directly tied up with how
often you commit. No matter which system you use, this variable will always
be there. If you commit frequently, you will be close to realtime, but you
will have numerous small files. If you commit after long intervals, you
will have larger files, but this is as good as a "batch world". We solved
this problem at my company by having 2 systems. One to commit the files at
small intervals, thus bringing data into durable storage reliably, and one
to roll up these small files. It's actually really simple to implement this
if you don't try to do it in a single job.

Best
Ayush

On Fri, Sep 11, 2020 at 2:22 PM Robert Metzger  wrote:

> Hi Marek,
>
> what you are describing is a known problem in Flink. There are some
> thoughts on how to address this in
> https://issues.apache.org/jira/browse/FLINK-11499 and
> https://issues.apache.org/jira/browse/FLINK-17505
> Maybe some ideas there help you already for your current problem (use long
> checkpoint intervals).
>
> A related idea to (2) is to write your data with the Avro format, and then
> regularly use a batch job to transform your data from Avro to Parquet.
>
> I hope these are some helpful pointers. I don't have a good overview over
> other potential open source solutions.
>
> Best,
> Robert
>
>
> On Thu, Sep 10, 2020 at 5:10 PM Marek Maj  wrote:
>
>> Hello Flink Community,
>>
>> When designing our data pipelines, we very often encounter the
>> requirement to stream traffic (usually from kafka) to external distributed
>> file system (usually HDFS or S3). This data is typically meant to be
>> queried from hive/presto or similar tools. Preferably data sits in columnar
>> format like parquet.
>>
>> Currently, using flink, it is possible to leverage StreamingFileSink to
>> achieve what we want to some extent. It satisfies our requirements to
>> partition data by event time, ensure exactly-once semantics and
>> fault-tolerance with checkpointing. Unfortunately, when using bulk writer
>> like PaquetWriter, that comes with a price of producing a big number of
>> files which degrades the performance of queries.
>>
>> I believe that many companies struggle with similar use cases. I know
>> that some of them have already approached that problem. Solutions like
>> Alibaba Hologres or Netflix solution with Iceberg described during FF 2019
>> emerged. Given that full transition to real-time data warehouse may take a
>> significant amount of time and effort, I would like to primarily focus on
>> solutions for tools like hive/presto backed up by a distributed file
>> system. Usually those are the systems that we are integrating with.
>>
>> So what options do we have? Maybe I missed some existing open source
>> tool?
>>
>> Currently, I can come up with two approaches using flink exclusively:
>> 1. Cache incoming traffic in flink state until trigger fires according to
>> rolling strategy, probably with some late events special strategy and then
>> output data with StreamingFileSink. Solution is not perfect as it may
>> introduce additional latency and queries will still be less performant
>> compared to fully compacted files (late events problem). And the biggest
>> issue I am afraid of is actually a performance drop while releasing data
>> from flink state and its peak character
>> 2. Focus on implementing batch rewrite job that will compact data
>> offline. Source for the job could be both kafka or small files produced by
>> another job that uses plain StreamingFileSink. The drawback is that whole
>> system gets more complex, additional maintenance is needed and, maybe what
>> is more troubling, we enter to batch world again (how could we know that no
>> more late data will come and we can safely run the job)
>>
>> I would really love to hear what are community thoughts on that.
>>
>> Kind regards
>> Marek
>>
>


Re: Using S3 as a streaming File source

2020-09-01 Thread Ayush Verma
Word of caution. Streaming from S3 is really cost prohibitive as the only
way to detect new files is to continuously spam the S3 List API.

On Tue, Sep 1, 2020 at 4:50 PM Jörn Franke  wrote:

> Why don’t you get an S3 notification on SQS and do the actions from there?
>
> You will probably need to write the content of the files to a no sql
> database .
>
> Alternatively send the s3 notification to Kafka and read flink from there.
>
>
> https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
>
>
> Am 01.09.2020 um 16:46 schrieb orionemail :
>
> 
> Hi,
>
> I have a S3 bucket that is continuously written to by millions of
> devices.  These upload small compressed archives.
>
> What I want to do is treat the tar gzipped (.tgz) files as a streaming
> source and process each archive.  The archive contains three files that
> each might need to be processed.
>
> I see that
>
> env.readFile(f, bucket, FileProcessingMode.*PROCESS_CONTINUOUSLY*, 
> 1L).print();
>
> might do what I need, but I am unsure how best to implement 'f' - the
> InputFileFormat.  Is there a similar example for me to reference?
>
> Or is this idea not workable with this method? I need to ensure exactly
> once, and also trigger removal of the files after processing.
>
> Thanks,
>
>
> Sent with ProtonMail  Secure Email.
>
>


Re: Using S3 as a sink (StreamingFileSink)

2019-08-18 Thread Ayush Verma
Hi,
I would suggest you upgrade flink to 1.7.x and flink-s3-fs-hadoop to 1.7.2.
You might be facing this issue:

   - https://issues.apache.org/jira/browse/FLINK-11496
   - https://issues.apache.org/jira/browse/FLINK-11302

Kind regards
Ayush Verma

On Sun, Aug 18, 2019 at 6:02 PM taher koitawala  wrote:

> We used EMR version 5.20 which has Flink 1.6.2 and all other libraries
> were according to this version. So flink-s3-fs-hadoop was 1.6.2 as well.
>
> On Sun, Aug 18, 2019, 9:55 PM Ayush Verma  wrote:
>
>> Hello, could you tell us the version of flink-s3-fs-hadoop library that
>> you are using ?
>>
>> On Sun 18 Aug 2019 at 16:24, taher koitawala  wrote:
>>
>>> Hi Swapnil,
>>>We faced this problem once, I think changing checkpoint dir to
>>> hdfs and keeping sink dir to s3 with EMRFS s3 consistency enabled solves
>>> this problem. If you are not using emr then I don't know how else it can be
>>> solved. But in a nutshell because EMRFS s3 consistency uses Dynamo DB in
>>> the back end to check for all files being written to s3. It kind of makes
>>> s3 consistent and Streaming file sink works just fine.
>>>
>>>
>>>
>>> On Sat, Aug 17, 2019, 3:32 AM Swapnil Kumar  wrote:
>>>
>>>> Hello, We are using Flink to process input events and aggregate and
>>>> write o/p of our streaming job to S3 using StreamingFileSink but whenever
>>>> we try to restore the job from a savepoint, the restoration fails with
>>>> missing part files error. As per my understanding, s3 deletes those
>>>> part(intermittent) files and can no longer be found on s3. Is there a
>>>> workaround for this, so that we can use s3 as a sink?
>>>>
>>>> --
>>>> Thanks,
>>>> Swapnil Kumar
>>>>
>>>


Re: Using S3 as a sink (StreamingFileSink)

2019-08-18 Thread Ayush Verma
Hello, could you tell us the version of flink-s3-fs-hadoop library that you
are using ?

On Sun 18 Aug 2019 at 16:24, taher koitawala  wrote:

> Hi Swapnil,
>We faced this problem once, I think changing checkpoint dir to hdfs
> and keeping sink dir to s3 with EMRFS s3 consistency enabled solves this
> problem. If you are not using emr then I don't know how else it can be
> solved. But in a nutshell because EMRFS s3 consistency uses Dynamo DB in
> the back end to check for all files being written to s3. It kind of makes
> s3 consistent and Streaming file sink works just fine.
>
>
>
> On Sat, Aug 17, 2019, 3:32 AM Swapnil Kumar  wrote:
>
>> Hello, We are using Flink to process input events and aggregate and write
>> o/p of our streaming job to S3 using StreamingFileSink but whenever we try
>> to restore the job from a savepoint, the restoration fails with missing
>> part files error. As per my understanding, s3 deletes those
>> part(intermittent) files and can no longer be found on s3. Is there a
>> workaround for this, so that we can use s3 as a sink?
>>
>> --
>> Thanks,
>> Swapnil Kumar
>>
>


Issue using Flink on EMR

2019-06-03 Thread Ayush Verma
Hello,

We have a Flink on EMR setup following this guide
. YARN,
apparently changes the io.tmp.dirs property to /mnt/yarn & /mnt1/yarn. When
using these directories, the flink job gets the following error.

2019-05-22 12:23:12,515 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
my-flink-job (e92bf814c495e2c713e24f1d37aa3afd) switched from state
RUNNING to FAILING.
java.nio.file.NoSuchFileException:
/mnt/yarn/usercache/hadoop/appcache/application_1558347223117_0001,/mnt1/yarn/usercache/hadoop/appcache/application_1558347223117_0001/.tmp_c729afcc-7bd7-4422-8232-306e28bc62c1
 at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
 at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
 at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
 at 
sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
 at 
java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
 at java.nio.file.Files.newOutputStream(Files.java:216)
 at 
org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator.apply(RefCountedTmpFileCreator.java:80)
 at 
org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator.apply(RefCountedTmpFileCreator.java:39)
 at 
org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.openNew(RefCountedBufferingFileStream.java:174)
 at 
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.boundedBufferingFileStream(S3RecoverableFsDataOutputStream.java:271)
 at 
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.newStream(S3RecoverableFsDataOutputStream.java:236)
 at 
org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:74)
 at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:221)
 at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
 at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
 at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
 at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:565)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724)
 at 
org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
 at 
net.skyscanner.data.platform.flink.parquet.ParquetConsumerJob$3.processElement(ParquetConsumerJob.java:96)
 at 
net.skyscanner.data.platform.flink.parquet.ParquetConsumerJob$3.processElement(ParquetConsumerJob.java:92)
 at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at 

Limitations in StreamingFileSink BulkFormat

2019-05-31 Thread Ayush Verma
Hello,

I am using the StreamingFileSink BulkFormat in my Flink stream processing
job to output parquet files to S3. Now the
StreamingFileSink.BulkFormatBuilder
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.BulkFormatBuilder.html>,
does not have an option to set a custom rolling policy. It will roll the
files whenever the checkpoint triggers
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.html>.
It would be better to have a rolling policy based on both *size* and *time*.
One option is to write our own StreamingFileSink, which does accept a
custom rolling policy, but I suspect there might be some reason for this
behaviour.
I would like to get the opinion of Flink experts on this. And if there are
any potential workarounds to get the desired behaviour.

Kind regards
Ayush Verma


Re: Heap Problem with Checkpoints

2018-08-09 Thread Ayush Verma
Hello Piotr, I work with Fabian and have been investigating the memory leak
associated with issues mentioned in this thread. I took a heap dump of our
master node and noticed that there was >1gb (and growing) worth of entries
in the set, /files/, in class *java.io.DeleteOnExitHook*.
Almost all the strings in this set look like,
/tmp/hadoop-root/s3a/output-*.tmp.

This means that the checkpointing code, which uploads the data to s3,
maintains it in a temporary local file, which is supposed to be deleted on
exit of the JVM. In our case, the checkpointing is quite heavy and because
we have a long running flink cluster, it causes this /set/ to grow
unbounded, eventually cause an OOM. Please see these images:

 

 

The culprit seems to be *org.apache.hadoop.fs.s3a.S3AOutputStream*, which
in-turn, calls
*org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite()*. If we
follow the method call chain from there, we end up at
*org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite()*, where we
can see the temp file being created and the method deleteOnExit() being
called.

Maybe instead of relying on *deleteOnExit()* we can keep track of these tmp
files, and as soon as they are no longer required, delete them ourself.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Storage options for RocksDBStateBackend

2017-05-12 Thread Ayush Goyal
Till and Stephan, thanks for your clarification.

@Till One more question, from what I have read about the checkpointing [1],
the list operations don't seem likely to be performed frequently, so
storing state backend on s3 shouldn't have any severe impact on flink
performance. Is this assumption right?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/stream_checkpointing.html

-- Ayush

On Fri, May 12, 2017 at 1:05 AM Stephan Ewen <se...@apache.org> wrote:

> Small addition to Till's comment:
>
> In the case where file:// points to a mounted distributed file system
> (NFS, MapRFs, ...), then it actually works. The important thing is that the
> filesystem where the checkpoints go is replicated (fault tolerant) and
> accessible from all nodes.
>
> On Thu, May 11, 2017 at 2:16 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Ayush,
>>
>> you’re right that RocksDB is the recommend state backend because of the
>> above-mentioned reasons. In order to make the recovery properly work, you
>> have to configure a shared directory for the checkpoint data via
>> state.backend.fs.checkpointdir. You can basically configure any file
>> system which is supported by Hadoop (no HDFS required). The reason is that
>> we use Hadoop to bridge between different file systems. The only thing you
>> have to make sure is that you have the respective file system
>> implementation in your class path.
>>
>> I think you can access Windows Azure Blob Storage via Hadoop [1]
>> similarly to access S3, for example.
>>
>> If you use S3 to store your checkpoint data, then you will benefit from
>> all the advantages of S3 but also suffer from its drawbacks (e.g. that list
>> operations are more costly). But these are not specific to Flink.
>>
>> A URL like file:// usually indicates a local file. Thus, if your Flink
>> cluster is not running on a single machine, then this won’t work.
>>
>> [1] https://hadoop.apache.org/docs/stable/hadoop-azure/index.html
>>
>> Cheers,
>> Till
>> ​
>>
>> On Thu, May 11, 2017 at 10:41 AM, Ayush Goyal <ay...@helpshift.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I had a few questions regarding checkpoint storage options using
>>> RocksDBStateBackend. In the flink 1.2 documentation, it is the
>>> recommended state
>>> backend due to it's ability to store large states and asynchronous
>>> snapshotting.
>>> For high availabilty it seems HDFS is the recommended store for state
>>> backend
>>> data. In AWS deployment section, it is also mentioned that s3 can be
>>> used for
>>> storing state backend data.
>>>
>>> We don't want to depend on a hadoop cluster for flink deployment, so I
>>> had
>>> following questions:
>>>
>>> 1. Can we use any storage backend supported by flink for storing RocksDB
>>>
>>> StateBackend data with file urls: there are quite a few supported as
>>> mentioned here:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html
>>> and here:
>>> https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md
>>>
>>> 2. Is there some work already done to support Windows Azure Blob Storage
>>> for
>>> storing State backend data? There are some docs here:
>>> https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md
>>> can we utilize this for that?
>>>
>>> 3. If utilizing S3 for state backend, is there any performance impact?
>>>
>>> 4. For high availability can we use a NFS volume for state backend, with
>>>
>>> "file://" urls? Will there be any performance impact?
>>>
>>> PS: I posted this email earlier via nabble, but it's not showing up in
>>> apache archive. So sending again. Apologies if it results in multiple
>>> threads.
>>>
>>> -- Ayush
>>>
>>
>>
>


Storage options for RocksDBStateBackend

2017-05-11 Thread Ayush Goyal
Hello,

I had a few questions regarding checkpoint storage options using
RocksDBStateBackend. In the flink 1.2 documentation, it is the recommended
state
backend due to it's ability to store large states and asynchronous
snapshotting.
For high availabilty it seems HDFS is the recommended store for state
backend
data. In AWS deployment section, it is also mentioned that s3 can be used
for
storing state backend data.

We don't want to depend on a hadoop cluster for flink deployment, so I had
following questions:

1. Can we use any storage backend supported by flink for storing RocksDB
StateBackend data with file urls: there are quite a few supported as
mentioned here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html
and here:
https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md

2. Is there some work already done to support Windows Azure Blob Storage for

storing State backend data? There are some docs here:
https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md
can we utilize this for that?

3. If utilizing S3 for state backend, is there any performance impact?

4. For high availability can we use a NFS volume for state backend, with
"file://" urls? Will there be any performance impact?

PS: I posted this email earlier via nabble, but it's not showing up in
apache archive. So sending again. Apologies if it results in multiple
threads.

-- Ayush


Storage options for RocksDBStateBackend

2017-05-11 Thread ayush
Hello,

I had a few questions regarding checkpoint storage options using
RocksDBStateBackend. In the flink 1.2 documentation, it is the recommended
state
backend due to it's ability to store large states and asynchronous
snapshotting.
For high availabilty it seems HDFS is the recommended store for state
backend
data. In AWS deployment section, it is also mentioned that s3 can be used
for
storing state backend data.

We don't want to depend on a hadoop cluster for flink deployment, so I had
following questions:

1. Can we use any storage backend supported by flink for storing RocksDB
StateBackend data with file urls: there are quite a few supported as
mentioned here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html
and here: 
https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md

2. Is there some work already done to support Windows Azure Blob Storage for
storing State backend data? There are some docs here:
https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md
can we utilize this for that?

3. If utilizing S3 for state backend, is there any performance impact?

4. For high availability can we use a NFS volume for state backend, with
"file://" urls? Will there be any performance impact?

-- Ayush



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Storage-options-for-RocksDBStateBackend-tp13102.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.