Thanks, Ken. That makes sense! I'll start a new thread.

On Wed, May 15, 2019 at 7:12 PM Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Hi Manish,
>
> It’s best to start a new thread if you have a new question - see
> https://home.apache.org/~hossman/#threadhijack for reasons why…
>
> Regards,
>
> — Ken
>
>
> On May 15, 2019, at 4:46 PM, Manish Bellani <manish.bell...@gmail.com>
> wrote:
>
> Hi Ken,
>
> Thanks for the quick response, you are actually right, the job seems to be
> running even after that error appears. It was crashing earlier (due to
> fs.s3a.multipart.size being too high) and I confused it with this error
> since that was the first one popping out and OOM wasn't apparent
> immediately.
>
> I do have a subsequent question though if you don't mind me asking this
> question in the same thread. So... if I'm reading the  BucketingSink code
> correctly then if I supply the core-site.xml with following contents, would
> it not pick the *S3RecoverableWriter* code path?:
>
> <configuration>
>
>     <property>
>         <name>fs.s3.impl</name>
>         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>     </property>
>
>     <property>
>         <name>fs.s3a.fast.upload</name>
>         <value>true</value>
>         <description>
>             Use the incremental block upload mechanism with
>             the buffering mechanism set in fs.s3a.fast.upload.buffer.
>             The number of threads performing uploads in the filesystem is
> defined
>             by fs.s3a.threads.max; the queue of waiting uploads limited by
>             fs.s3a.max.total.tasks.
>             The size of each buffer is set by fs.s3a.multipart.size.
>         </description>
>     </property>
>
>     <property>
>         <name>fs.s3a.fast.upload.buffer</name>
>         <value>array</value>
>         <description>
>             The buffering mechanism to use when using S3A fast upload
>             (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
>             This configuration option has no effect if fs.s3a.fast.upload
> is false.
>
>             "disk" will use the directories listed in fs.s3a.buffer.dir as
>             the location(s) to save data prior to being uploaded.
>
>             "array" uses arrays in the JVM heap
>
>             "bytebuffer" uses off-heap memory within the JVM.
>
>             Both "array" and "bytebuffer" will consume memory in a single
> stream up to the number
>             of blocks set by:
>
>             fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
>
>             If using either of these mechanisms, keep this value low
>
>             The total number of threads performing work across all threads
> is set by
>             fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting
> the number of queued
>             work items.
>         </description>
>     </property>
>
>     <property>
>         <name>fs.s3a.multipart.size</name>
>         <value>10M</value>
>         <description>How big (in bytes) to split upload or copy operations
> up into.
>             A suffix from the set {K,M,G,T,P} may be used to scale the
> numeric value.
>         </description>
>     </property>
>
>     <property>
>         <name>fs.s3a.fast.upload.active.blocks</name>
>         <value>8</value>
>         <description>
>             Maximum Number of blocks a single output stream can have
>             active (uploading, or queued to the central FileSystem
>             instance's pool of queued operations.
>
>             This stops a single stream overloading the shared thread pool.
>         </description>
>     </property>
>
>     <property>
>       <name>fs.s3a.aws.credentials.provider</name>
>
> <value>com.amazonaws.auth.EnvironmentVariableCredentialsProvider</value>
>     </property>
>
> </configuration>
>
> I say that because I don't see any files being written under `/tmp`
> directory with the pattern like ".tmp_UUID", which what
> RefCountedTmpFileCreator is supposed to create for staging writes to s3
> (which is wired in by org.apache.flink.fs.s3.common.FlinkS3FileSystem):
>
>     public RefCountedFile apply(File file) throws IOException {
>        File directory = this.tempDirectories[this.nextIndex()];
>
>         while(true) {
>             try {
>                 if (file == null) {
>                     File newFile = new File(directory, ".tmp_" +
> UUID.randomUUID());
>                     OutputStream out =
> Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
>                     return RefCountedFile.newFile(newFile, out);
>                 }
>
>                 OutputStream out = Files.newOutputStream(file.toPath(),
> StandardOpenOption.APPEND);
>                 return RefCountedFile.restoredFile(file, out,
> file.length());
>             } catch (FileAlreadyExistsException var5) {
>             }
>         }
>     }
>
>
> Is S3RecoverableWriter path even supported for BucketingSink?
>
> Manish
>
>
> On Wed, May 15, 2019 at 6:05 PM Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
>> Hi Manish,
>>
>> Are you sure this is an exception that’s actually killing the job?
>>
>> Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 talks
>> about Commons Beanutils logging this exception, but it’s a warning vs.
>> something being thrown up the stack.
>>
>> — Ken
>>
>> On May 15, 2019, at 3:50 PM, Manish Bellani <manish.bell...@gmail.com>
>> wrote:
>>
>> hey Friends,
>>
>> Thanks for all the work you have been doing on flink, I have been trying
>> to use BucketingSink (backed by S3AFileSystem) to write data to s3 and I'm
>> running into some issues (which I suspect could be dependency/packaging
>> related) that'd try to describe here.
>>
>> The data pipeline is quite simple:
>>
>> Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3
>>
>> *Environment:*
>>
>> Kubernetes
>> Debian
>> DockerImage: flink:1.7.2-hadoop28-scala_2.11
>> Java 1.8
>> Hadoop Version: 2.8.5
>>
>> I followed this dependency section:
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27
>> to place the dependencies under /opt/flink/lib (with an exception that my
>> Hadoop version and it's dependencies that I pull in are different).
>>
>> Here are the dependencies I'm pulling in (excerpt from my Dockerfile)
>>
>> RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.7.2.jar 
>> /opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar
>> RUN wget -O /opt/flink/lib/hadoop-aws-2.8.5.jar 
>> https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar
>> RUN wget -O /opt/flink/lib/aws-java-sdk-s3-1.10.6.jar 
>> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
>> #Transitive 
>> <http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar#Transitive>
>>  Dependency of aws-java-sdk-s3
>> RUN wget -O /opt/flink/lib/aws-java-sdk-core-1.10.6.jar 
>> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar
>> RUN wget -O /opt/flink/lib/aws-java-sdk-kms-1.10.6.jar 
>> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar
>> RUN wget -O /opt/flink/lib/jackson-annotations-2.5.3.jar 
>> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar
>> RUN wget -O /opt/flink/lib/jackson-core-2.5.3.jar 
>> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar
>> RUN wget -O /opt/flink/lib/jackson-databind-2.5.3.jar 
>> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar
>> RUN wget -O /opt/flink/lib/joda-time-2.8.1.jar 
>> http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar
>> RUN wget -O /opt/flink/lib/httpcore-4.3.3.jar 
>> http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar
>> RUN wget -O /opt/flink/lib/httpclient-4.3.6.jar 
>> http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar
>>
>>
>>
>> But when I submit the job, it throws this error during initialization of
>> BucketingSink/S3AFileSystem:
>>
>> java.beans.IntrospectionException: bad write method arg count: public final 
>> void 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)
>>     at 
>> java.beans.PropertyDescriptor.findPropertyType(PropertyDescriptor.java:657)
>>     at 
>> java.beans.PropertyDescriptor.setWriteMethod(PropertyDescriptor.java:327)
>>     at java.beans.PropertyDescriptor.<init>(PropertyDescriptor.java:139)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.createFluentPropertyDescritor(FluentPropertyBeanIntrospector.java:178)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.introspect(FluentPropertyBeanIntrospector.java:141)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.fetchIntrospectionData(PropertyUtilsBean.java:2245)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getIntrospectionData(PropertyUtilsBean.java:2226)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getPropertyDescriptor(PropertyUtilsBean.java:954)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.isWriteable(PropertyUtilsBean.java:1478)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.isPropertyWriteable(BeanHelper.java:521)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initProperty(BeanHelper.java:357)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBeanProperties(BeanHelper.java:273)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBean(BeanHelper.java:192)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper$BeanCreationContextImpl.initBean(BeanHelper.java:669)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.initBeanInstance(DefaultBeanFactory.java:162)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.createBean(DefaultBeanFactory.java:116)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:459)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:479)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:492)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResultInstance(BasicConfigurationBuilder.java:447)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResult(BasicConfigurationBuilder.java:417)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.getConfiguration(BasicConfigurationBuilder.java:285)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.loadFirst(MetricsConfig.java:119)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.create(MetricsConfig.java:98)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.configure(MetricsSystemImpl.java:478)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.start(MetricsSystemImpl.java:188)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.init(MetricsSystemImpl.java:163)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem(S3AInstrumentation.java:251)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.registerAsMetricsSource(S3AInstrumentation.java:264)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.<init>(S3AInstrumentation.java:243)
>>     at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:244)
>>     at 
>> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
>>     at 
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
>>     at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1224)
>>     at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
>>     at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
>>     at 
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>     at 
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>     at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>     at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>     at java.lang.Thread.run(Thread.java:748)
>>
>>
>>
>>
>>
>>
>> Some googling about "bad write method arg count" reveals that it could
>> potentially be related to a beanutils issue, but I'm not entirely sure.
>> I've hunted through all the jars that are on the classpath:
>>
>> /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Djavax.net 
>> <http://djavax.net/>.ssl.trustStore=/etc/ssl/java-certs/cacerts -XX:+UseG1GC 
>> -Xms5530M -Xmx5530M -XX:MaxDirectMemorySize=8388607T 
>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 
>> -classpath 
>> /opt/flink/lib/aws-java-sdk-core-1.10.6.jar:/opt/flink/lib/aws-java-sdk-kms-1.10.6.jar:/opt/flink/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink/lib/flink-metrics-datadog-1.6.2.jar:/opt/flink/lib/flink-python_2.11-1.7.2.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.2.jar:/opt/flink/lib/hadoop-aws-2.8.5.jar:/opt/flink/lib/httpclient-4.3.6.jar:/opt/flink/lib/httpcore-4.3.3.jar:/opt/flink/lib/jackson-annotations-2.5.3.jar:/opt/flink/lib/jackson-core-2.5.3.jar:/opt/flink/lib/jackson-databind-2.5.3.jar:/opt/flink/lib/joda-time-2.8.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.7.2.jar::/opt/flink/conf:
>>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir 
>> /opt/flink/conf
>>
>>
>>
>> and i see that `FluentPropertyBeanIntrospector` is contained within the
>> following two jars:
>>
>> flink-s3-fs-hadoop-1.7.2.jar:org/apache/flink/fs/shaded/hadoop3/org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class
>> flink-shaded-hadoop2-uber-1.7.2.jar:2019 
>> org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class
>>
>>
>>  both of those jars are packaged as part of the flink distribution I'm
>> using. I can't think of any other options atm other than thinking that this
>> could potentially be some incompatible transitive dependency issue. I would
>> love to get some advice from y'all to see if this is a packaging bug or
>> something else on my side.
>>
>>
>> Thanks
>>
>> Manish
>>
>>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Reply via email to