Re: Error While Initializing S3A FileSystem

2019-05-15 Thread Manish Bellani
Thanks, Ken. That makes sense! I'll start a new thread.

On Wed, May 15, 2019 at 7:12 PM Ken Krugler 
wrote:

> Hi Manish,
>
> It’s best to start a new thread if you have a new question - see
> https://home.apache.org/~hossman/#threadhijack for reasons why…
>
> Regards,
>
> — Ken
>
>
> On May 15, 2019, at 4:46 PM, Manish Bellani 
> wrote:
>
> Hi Ken,
>
> Thanks for the quick response, you are actually right, the job seems to be
> running even after that error appears. It was crashing earlier (due to
> fs.s3a.multipart.size being too high) and I confused it with this error
> since that was the first one popping out and OOM wasn't apparent
> immediately.
>
> I do have a subsequent question though if you don't mind me asking this
> question in the same thread. So... if I'm reading the  BucketingSink code
> correctly then if I supply the core-site.xml with following contents, would
> it not pick the *S3RecoverableWriter* code path?:
>
> 
>
> 
> fs.s3.impl
> org.apache.hadoop.fs.s3a.S3AFileSystem
> 
>
> 
> fs.s3a.fast.upload
> true
> 
> Use the incremental block upload mechanism with
> the buffering mechanism set in fs.s3a.fast.upload.buffer.
> The number of threads performing uploads in the filesystem is
> defined
> by fs.s3a.threads.max; the queue of waiting uploads limited by
> fs.s3a.max.total.tasks.
> The size of each buffer is set by fs.s3a.multipart.size.
> 
> 
>
> 
> fs.s3a.fast.upload.buffer
> array
> 
> The buffering mechanism to use when using S3A fast upload
> (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
> This configuration option has no effect if fs.s3a.fast.upload
> is false.
>
> "disk" will use the directories listed in fs.s3a.buffer.dir as
> the location(s) to save data prior to being uploaded.
>
> "array" uses arrays in the JVM heap
>
> "bytebuffer" uses off-heap memory within the JVM.
>
> Both "array" and "bytebuffer" will consume memory in a single
> stream up to the number
> of blocks set by:
>
> fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
>
> If using either of these mechanisms, keep this value low
>
> The total number of threads performing work across all threads
> is set by
> fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting
> the number of queued
> work items.
> 
> 
>
> 
> fs.s3a.multipart.size
> 10M
> How big (in bytes) to split upload or copy operations
> up into.
> A suffix from the set {K,M,G,T,P} may be used to scale the
> numeric value.
> 
> 
>
> 
> fs.s3a.fast.upload.active.blocks
> 8
> 
> Maximum Number of blocks a single output stream can have
> active (uploading, or queued to the central FileSystem
> instance's pool of queued operations.
>
> This stops a single stream overloading the shared thread pool.
> 
> 
>
> 
>   fs.s3a.aws.credentials.provider
>
> com.amazonaws.auth.EnvironmentVariableCredentialsProvider
> 
>
> 
>
> I say that because I don't see any files being written under `/tmp`
> directory with the pattern like ".tmp_UUID", which what
> RefCountedTmpFileCreator is supposed to create for staging writes to s3
> (which is wired in by org.apache.flink.fs.s3.common.FlinkS3FileSystem):
>
> public RefCountedFile apply(File file) throws IOException {
>File directory = this.tempDirectories[this.nextIndex()];
>
> while(true) {
> try {
> if (file == null) {
> File newFile = new File(directory, ".tmp_" +
> UUID.randomUUID());
> OutputStream out =
> Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
> return RefCountedFile.newFile(newFile, out);
> }
>
> OutputStream out = Files.newOutputStream(file.toPath(),
> StandardOpenOption.APPEND);
> return RefCountedFile.restoredFile(file, out,
> file.length());
> } catch (FileAlreadyExistsException var5) {
> }
> }
> }
>
>
> Is S3RecoverableWriter path even supported for BucketingSink?
>
> Manish
>
>
> On Wed, May 15, 2019 at 6:05 PM Ken Krugler 
> wrote:
>
>> Hi Manish,
>>
>> Are you sure this is an exception that’s actually killing the job?
>>
>> Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 talks
>> about Commons Beanutils logging this exception, but it’s a warning vs.
>> something being thrown up the stack.
>>
>> — Ken
>>
>> On May 15, 2019, at 3:50 PM, Manish Bellani 
>> wrote:
>>
>> hey Friends,
>>
>> Thanks for all the work you have been doing on flink, I have been trying

Re: Error While Initializing S3A FileSystem

2019-05-15 Thread Ken Krugler
Hi Manish,

It’s best to start a new thread if you have a new question - see 
https://home.apache.org/~hossman/#threadhijack 
 for reasons why…

Regards,

— Ken


> On May 15, 2019, at 4:46 PM, Manish Bellani  wrote:
> 
> Hi Ken,
> 
> Thanks for the quick response, you are actually right, the job seems to be 
> running even after that error appears. It was crashing earlier (due to 
> fs.s3a.multipart.size being too high) and I confused it with this error since 
> that was the first one popping out and OOM wasn't apparent immediately.
> 
> I do have a subsequent question though if you don't mind me asking this 
> question in the same thread. So... if I'm reading the  BucketingSink code 
> correctly then if I supply the core-site.xml with following contents, would 
> it not pick the S3RecoverableWriter code path?:
> 
> 
> 
> 
> fs.s3.impl
> org.apache.hadoop.fs.s3a.S3AFileSystem
> 
> 
> 
> fs.s3a.fast.upload
> true
> 
> Use the incremental block upload mechanism with
> the buffering mechanism set in fs.s3a.fast.upload.buffer.
> The number of threads performing uploads in the filesystem is 
> defined
> by fs.s3a.threads.max; the queue of waiting uploads limited by
> fs.s3a.max.total.tasks.
> The size of each buffer is set by fs.s3a.multipart.size.
> 
> 
> 
> 
> fs.s3a.fast.upload.buffer
> array
> 
> The buffering mechanism to use when using S3A fast upload
> (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
> This configuration option has no effect if fs.s3a.fast.upload is 
> false.
> 
> "disk" will use the directories listed in fs.s3a.buffer.dir as
> the location(s) to save data prior to being uploaded.
> 
> "array" uses arrays in the JVM heap
> 
> "bytebuffer" uses off-heap memory within the JVM.
> 
> Both "array" and "bytebuffer" will consume memory in a single 
> stream up to the number
> of blocks set by:
> 
> fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
> 
> If using either of these mechanisms, keep this value low
> 
> The total number of threads performing work across all threads is 
> set by
> fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting 
> the number of queued
> work items.
> 
> 
> 
> 
> fs.s3a.multipart.size
> 10M
> How big (in bytes) to split upload or copy operations up 
> into.
> A suffix from the set {K,M,G,T,P} may be used to scale the 
> numeric value.
> 
> 
> 
> 
> fs.s3a.fast.upload.active.blocks
> 8
> 
> Maximum Number of blocks a single output stream can have
> active (uploading, or queued to the central FileSystem
> instance's pool of queued operations.
> 
> This stops a single stream overloading the shared thread pool.
> 
> 
> 
> 
>   fs.s3a.aws.credentials.provider
>   com.amazonaws.auth.EnvironmentVariableCredentialsProvider
> 
> 
> 
> 
> I say that because I don't see any files being written under `/tmp` directory 
> with the pattern like ".tmp_UUID", which what RefCountedTmpFileCreator is 
> supposed to create for staging writes to s3 (which is wired in by 
> org.apache.flink.fs.s3.common.FlinkS3FileSystem): 
> 
> public RefCountedFile apply(File file) throws IOException {
>File directory = this.tempDirectories[this.nextIndex()];
> 
> while(true) {
> try {
> if (file == null) {
> File newFile = new File(directory, ".tmp_" + 
> UUID.randomUUID());
> OutputStream out = 
> Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
> return RefCountedFile.newFile(newFile, out);
> }
> 
> OutputStream out = Files.newOutputStream(file.toPath(), 
> StandardOpenOption.APPEND);
> return RefCountedFile.restoredFile(file, out, file.length());
> } catch (FileAlreadyExistsException var5) {
> }
> }
> }
> 
> 
> Is S3RecoverableWriter path even supported for BucketingSink?
> 
> Manish
> 
> 
> On Wed, May 15, 2019 at 6:05 PM Ken Krugler  > wrote:
> Hi Manish,
> 
> Are you sure this is an exception that’s actually killing the job?
> 
> Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 
>  talks about Commons 
> Beanutils logging this exception, but it’s a warning vs. something being 
> thrown up the stack.
> 
> — Ken
> 
>> On May 15, 2019, at 3:50 PM, Manish Bellani > > wrote:
>> 
>> 

Re: Error While Initializing S3A FileSystem

2019-05-15 Thread Manish Bellani
Hi Ken,

Thanks for the quick response, you are actually right, the job seems to be
running even after that error appears. It was crashing earlier (due to
fs.s3a.multipart.size being too high) and I confused it with this error
since that was the first one popping out and OOM wasn't apparent
immediately.

I do have a subsequent question though if you don't mind me asking this
question in the same thread. So... if I'm reading the  BucketingSink code
correctly then if I supply the core-site.xml with following contents, would
it not pick the *S3RecoverableWriter* code path?:




fs.s3.impl
org.apache.hadoop.fs.s3a.S3AFileSystem



fs.s3a.fast.upload
true

Use the incremental block upload mechanism with
the buffering mechanism set in fs.s3a.fast.upload.buffer.
The number of threads performing uploads in the filesystem is
defined
by fs.s3a.threads.max; the queue of waiting uploads limited by
fs.s3a.max.total.tasks.
The size of each buffer is set by fs.s3a.multipart.size.




fs.s3a.fast.upload.buffer
array

The buffering mechanism to use when using S3A fast upload
(fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
This configuration option has no effect if fs.s3a.fast.upload
is false.

"disk" will use the directories listed in fs.s3a.buffer.dir as
the location(s) to save data prior to being uploaded.

"array" uses arrays in the JVM heap

"bytebuffer" uses off-heap memory within the JVM.

Both "array" and "bytebuffer" will consume memory in a single
stream up to the number
of blocks set by:

fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.

If using either of these mechanisms, keep this value low

The total number of threads performing work across all threads
is set by
fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting
the number of queued
work items.




fs.s3a.multipart.size
10M
How big (in bytes) to split upload or copy operations
up into.
A suffix from the set {K,M,G,T,P} may be used to scale the
numeric value.




fs.s3a.fast.upload.active.blocks
8

Maximum Number of blocks a single output stream can have
active (uploading, or queued to the central FileSystem
instance's pool of queued operations.

This stops a single stream overloading the shared thread pool.




  fs.s3a.aws.credentials.provider

com.amazonaws.auth.EnvironmentVariableCredentialsProvider




I say that because I don't see any files being written under `/tmp`
directory with the pattern like ".tmp_UUID", which what
RefCountedTmpFileCreator is supposed to create for staging writes to s3
(which is wired in by org.apache.flink.fs.s3.common.FlinkS3FileSystem):

public RefCountedFile apply(File file) throws IOException {
   File directory = this.tempDirectories[this.nextIndex()];

while(true) {
try {
if (file == null) {
File newFile = new File(directory, ".tmp_" +
UUID.randomUUID());
OutputStream out =
Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
return RefCountedFile.newFile(newFile, out);
}

OutputStream out = Files.newOutputStream(file.toPath(),
StandardOpenOption.APPEND);
return RefCountedFile.restoredFile(file, out,
file.length());
} catch (FileAlreadyExistsException var5) {
}
}
}


Is S3RecoverableWriter path even supported for BucketingSink?

Manish


On Wed, May 15, 2019 at 6:05 PM Ken Krugler 
wrote:

> Hi Manish,
>
> Are you sure this is an exception that’s actually killing the job?
>
> Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 talks
> about Commons Beanutils logging this exception, but it’s a warning vs.
> something being thrown up the stack.
>
> — Ken
>
> On May 15, 2019, at 3:50 PM, Manish Bellani 
> wrote:
>
> hey Friends,
>
> Thanks for all the work you have been doing on flink, I have been trying
> to use BucketingSink (backed by S3AFileSystem) to write data to s3 and I'm
> running into some issues (which I suspect could be dependency/packaging
> related) that'd try to describe here.
>
> The data pipeline is quite simple:
>
> Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3
>
> *Environment:*
>
> Kubernetes
> Debian
> DockerImage: flink:1.7.2-hadoop28-scala_2.11
> Java 1.8
> Hadoop Version: 2.8.5
>
> I followed this dependency section:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27
> to place the 

Re: Error While Initializing S3A FileSystem

2019-05-15 Thread Ken Krugler
Hi Manish,

Are you sure this is an exception that’s actually killing the job?

Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 
 talks about Commons 
Beanutils logging this exception, but it’s a warning vs. something being thrown 
up the stack.

— Ken

> On May 15, 2019, at 3:50 PM, Manish Bellani  wrote:
> 
> hey Friends,
> 
> Thanks for all the work you have been doing on flink, I have been trying to 
> use BucketingSink (backed by S3AFileSystem) to write data to s3 and I'm 
> running into some issues (which I suspect could be dependency/packaging 
> related) that'd try to describe here.
> 
> The data pipeline is quite simple:
> 
> Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3
> Environment:
> 
> Kubernetes
> Debian
> DockerImage: flink:1.7.2-hadoop28-scala_2.11
> Java 1.8
> Hadoop Version: 2.8.5
> I followed this dependency section: 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27
>  
> 
>  to place the dependencies under /opt/flink/lib (with an exception that my 
> Hadoop version and it's dependencies that I pull in are different).
> 
> Here are the dependencies I'm pulling in (excerpt from my Dockerfile)
> 
> RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.7.2.jar 
> /opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar
> RUN wget -O /opt/flink/lib/hadoop-aws-2.8.5.jar 
> https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar
>  
> 
> RUN wget -O /opt/flink/lib/aws-java-sdk-s3-1.10.6.jar 
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
> #Transitive 
> 
>  Dependency of aws-java-sdk-s3
> RUN wget -O /opt/flink/lib/aws-java-sdk-core-1.10.6.jar 
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar
>  
> 
> RUN wget -O /opt/flink/lib/aws-java-sdk-kms-1.10.6.jar 
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar
>  
> 
> RUN wget -O /opt/flink/lib/jackson-annotations-2.5.3.jar 
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar
>  
> 
> RUN wget -O /opt/flink/lib/jackson-core-2.5.3.jar 
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar
>  
> 
> RUN wget -O /opt/flink/lib/jackson-databind-2.5.3.jar 
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar
>  
> 
> RUN wget -O /opt/flink/lib/joda-time-2.8.1.jar 
> http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar 
> 
> RUN wget -O /opt/flink/lib/httpcore-4.3.3.jar 
> http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar
>  
> 
> RUN wget -O /opt/flink/lib/httpclient-4.3.6.jar 
> http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar
>  
> 
>  
> But when I submit the job, it throws this error during initialization of 
> BucketingSink/S3AFileSystem:
> 
> java.beans.IntrospectionException: bad write method arg count: public final 
> void 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)
> at 
> java.beans.PropertyDescriptor.findPropertyType(PropertyDescriptor.java:657)
> at 
> java.beans.PropertyDescriptor.setWriteMethod(PropertyDescriptor.java:327)
> at java.beans.PropertyDescriptor.(PropertyDescriptor.java:139)
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.createFluentPropertyDescritor(FluentPropertyBeanIntrospector.java:178)
> at 
>