Hi Manish, It’s best to start a new thread if you have a new question - see https://home.apache.org/~hossman/#threadhijack <https://home.apache.org/~hossman/#threadhijack> for reasons why…
Regards, — Ken > On May 15, 2019, at 4:46 PM, Manish Bellani <manish.bell...@gmail.com> wrote: > > Hi Ken, > > Thanks for the quick response, you are actually right, the job seems to be > running even after that error appears. It was crashing earlier (due to > fs.s3a.multipart.size being too high) and I confused it with this error since > that was the first one popping out and OOM wasn't apparent immediately. > > I do have a subsequent question though if you don't mind me asking this > question in the same thread. So... if I'm reading the BucketingSink code > correctly then if I supply the core-site.xml with following contents, would > it not pick the S3RecoverableWriter code path?: > > <configuration> > > <property> > <name>fs.s3.impl</name> > <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> > </property> > > <property> > <name>fs.s3a.fast.upload</name> > <value>true</value> > <description> > Use the incremental block upload mechanism with > the buffering mechanism set in fs.s3a.fast.upload.buffer. > The number of threads performing uploads in the filesystem is > defined > by fs.s3a.threads.max; the queue of waiting uploads limited by > fs.s3a.max.total.tasks. > The size of each buffer is set by fs.s3a.multipart.size. > </description> > </property> > > <property> > <name>fs.s3a.fast.upload.buffer</name> > <value>array</value> > <description> > The buffering mechanism to use when using S3A fast upload > (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer. > This configuration option has no effect if fs.s3a.fast.upload is > false. > > "disk" will use the directories listed in fs.s3a.buffer.dir as > the location(s) to save data prior to being uploaded. > > "array" uses arrays in the JVM heap > > "bytebuffer" uses off-heap memory within the JVM. > > Both "array" and "bytebuffer" will consume memory in a single > stream up to the number > of blocks set by: > > fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks. > > If using either of these mechanisms, keep this value low > > The total number of threads performing work across all threads is > set by > fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting > the number of queued > work items. > </description> > </property> > > <property> > <name>fs.s3a.multipart.size</name> > <value>10M</value> > <description>How big (in bytes) to split upload or copy operations up > into. > A suffix from the set {K,M,G,T,P} may be used to scale the > numeric value. > </description> > </property> > > <property> > <name>fs.s3a.fast.upload.active.blocks</name> > <value>8</value> > <description> > Maximum Number of blocks a single output stream can have > active (uploading, or queued to the central FileSystem > instance's pool of queued operations. > > This stops a single stream overloading the shared thread pool. > </description> > </property> > > <property> > <name>fs.s3a.aws.credentials.provider</name> > <value>com.amazonaws.auth.EnvironmentVariableCredentialsProvider</value> > </property> > > </configuration> > > I say that because I don't see any files being written under `/tmp` directory > with the pattern like ".tmp_UUID", which what RefCountedTmpFileCreator is > supposed to create for staging writes to s3 (which is wired in by > org.apache.flink.fs.s3.common.FlinkS3FileSystem): > > public RefCountedFile apply(File file) throws IOException { > File directory = this.tempDirectories[this.nextIndex()]; > > while(true) { > try { > if (file == null) { > File newFile = new File(directory, ".tmp_" + > UUID.randomUUID()); > OutputStream out = > Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW); > return RefCountedFile.newFile(newFile, out); > } > > OutputStream out = Files.newOutputStream(file.toPath(), > StandardOpenOption.APPEND); > return RefCountedFile.restoredFile(file, out, file.length()); > } catch (FileAlreadyExistsException var5) { > } > } > } > > > Is S3RecoverableWriter path even supported for BucketingSink? > > Manish > > > On Wed, May 15, 2019 at 6:05 PM Ken Krugler <kkrugler_li...@transpac.com > <mailto:kkrugler_li...@transpac.com>> wrote: > Hi Manish, > > Are you sure this is an exception that’s actually killing the job? > > Asking because https://issues.apache.org/jira/browse/BEANUTILS-477 > <https://issues.apache.org/jira/browse/BEANUTILS-477> talks about Commons > Beanutils logging this exception, but it’s a warning vs. something being > thrown up the stack. > > — Ken > >> On May 15, 2019, at 3:50 PM, Manish Bellani <manish.bell...@gmail.com >> <mailto:manish.bell...@gmail.com>> wrote: >> >> hey Friends, >> >> Thanks for all the work you have been doing on flink, I have been trying to >> use BucketingSink (backed by S3AFileSystem) to write data to s3 and I'm >> running into some issues (which I suspect could be dependency/packaging >> related) that'd try to describe here. >> >> The data pipeline is quite simple: >> >> Kafka -> KafkaConsumer (Source) -> BucketingSink (S3AFileSystem) -> S3 >> Environment: >> >> Kubernetes >> Debian >> DockerImage: flink:1.7.2-hadoop28-scala_2.11 >> Java 1.8 >> Hadoop Version: 2.8.5 >> I followed this dependency section: >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27 >> >> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#flink-for-hadoop-27> >> to place the dependencies under /opt/flink/lib (with an exception that my >> Hadoop version and it's dependencies that I pull in are different). >> >> Here are the dependencies I'm pulling in (excerpt from my Dockerfile) >> >> RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.7.2.jar >> /opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar >> RUN wget -O /opt/flink/lib/hadoop-aws-2.8.5.jar >> https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar >> >> <https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.5/hadoop-aws-2.8.5.jar> >> RUN wget -O /opt/flink/lib/aws-java-sdk-s3-1.10.6.jar >> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar >> #Transitive >> <http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar#Transitive> >> Dependency of aws-java-sdk-s3 >> RUN wget -O /opt/flink/lib/aws-java-sdk-core-1.10.6.jar >> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar >> >> <http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.10.6/aws-java-sdk-core-1.10.6.jar> >> RUN wget -O /opt/flink/lib/aws-java-sdk-kms-1.10.6.jar >> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar >> >> <http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.10.6/aws-java-sdk-kms-1.10.6.jar> >> RUN wget -O /opt/flink/lib/jackson-annotations-2.5.3.jar >> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar >> >> <http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.5.3/jackson-annotations-2.5.3.jar> >> RUN wget -O /opt/flink/lib/jackson-core-2.5.3.jar >> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar >> >> <http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.5.3/jackson-core-2.5.3.jar> >> RUN wget -O /opt/flink/lib/jackson-databind-2.5.3.jar >> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar >> >> <http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.5.3/jackson-databind-2.5.3.jar> >> RUN wget -O /opt/flink/lib/joda-time-2.8.1.jar >> http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar >> >> <http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar> >> RUN wget -O /opt/flink/lib/httpcore-4.3.3.jar >> http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar >> >> <http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar> >> RUN wget -O /opt/flink/lib/httpclient-4.3.6.jar >> http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar >> >> <http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar> >> >> But when I submit the job, it throws this error during initialization of >> BucketingSink/S3AFileSystem: >> >> java.beans.IntrospectionException: bad write method arg count: public final >> void >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object) >> at >> java.beans.PropertyDescriptor.findPropertyType(PropertyDescriptor.java:657) >> at >> java.beans.PropertyDescriptor.setWriteMethod(PropertyDescriptor.java:327) >> at java.beans.PropertyDescriptor.<init>(PropertyDescriptor.java:139) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.createFluentPropertyDescritor(FluentPropertyBeanIntrospector.java:178) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector.introspect(FluentPropertyBeanIntrospector.java:141) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.fetchIntrospectionData(PropertyUtilsBean.java:2245) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getIntrospectionData(PropertyUtilsBean.java:2226) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.getPropertyDescriptor(PropertyUtilsBean.java:954) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.PropertyUtilsBean.isWriteable(PropertyUtilsBean.java:1478) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.isPropertyWriteable(BeanHelper.java:521) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initProperty(BeanHelper.java:357) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBeanProperties(BeanHelper.java:273) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.initBean(BeanHelper.java:192) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper$BeanCreationContextImpl.initBean(BeanHelper.java:669) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.initBeanInstance(DefaultBeanFactory.java:162) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.DefaultBeanFactory.createBean(DefaultBeanFactory.java:116) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:459) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:479) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.beanutils.BeanHelper.createBean(BeanHelper.java:492) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResultInstance(BasicConfigurationBuilder.java:447) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.createResult(BasicConfigurationBuilder.java:417) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.builder.BasicConfigurationBuilder.getConfiguration(BasicConfigurationBuilder.java:285) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.loadFirst(MetricsConfig.java:119) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig.create(MetricsConfig.java:98) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.configure(MetricsSystemImpl.java:478) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.start(MetricsSystemImpl.java:188) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl.init(MetricsSystemImpl.java:163) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem(S3AInstrumentation.java:251) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.registerAsMetricsSource(S3AInstrumentation.java:264) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInstrumentation.<init>(S3AInstrumentation.java:243) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:244) >> at >> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125) >> at >> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1224) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >> at java.lang.Thread.run(Thread.java:748) >> >> >> >> >> Some googling about "bad write method arg count" reveals that it could >> potentially be related to a beanutils issue, but I'm not entirely sure. I've >> hunted through all the jars that are on the classpath: >> >> /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Djavax.net >> <http://djavax.net/>.ssl.trustStore=/etc/ssl/java-certs/cacerts -XX:+UseG1GC >> -Xms5530M -Xmx5530M -XX:MaxDirectMemorySize=8388607T >> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties >> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml >> -classpath >> /opt/flink/lib/aws-java-sdk-core-1.10.6.jar:/opt/flink/lib/aws-java-sdk-kms-1.10.6.jar:/opt/flink/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink/lib/flink-metrics-datadog-1.6.2.jar:/opt/flink/lib/flink-python_2.11-1.7.2.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.7.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.2.jar:/opt/flink/lib/hadoop-aws-2.8.5.jar:/opt/flink/lib/httpclient-4.3.6.jar:/opt/flink/lib/httpcore-4.3.3.jar:/opt/flink/lib/jackson-annotations-2.5.3.jar:/opt/flink/lib/jackson-core-2.5.3.jar:/opt/flink/lib/jackson-databind-2.5.3.jar:/opt/flink/lib/joda-time-2.8.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.7.2.jar::/opt/flink/conf: >> org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir >> /opt/flink/conf >> >> and i see that `FluentPropertyBeanIntrospector` is contained within the >> following two jars: >> >> flink-s3-fs-hadoop-1.7.2.jar:org/apache/flink/fs/shaded/hadoop3/org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class >> flink-shaded-hadoop2-uber-1.7.2.jar:2019 >> org/apache/commons/beanutils/FluentPropertyBeanIntrospector.class >> >> both of those jars are packaged as part of the flink distribution I'm >> using. I can't think of any other options atm other than thinking that this >> could potentially be some incompatible transitive dependency issue. I would >> love to get some advice from y'all to see if this is a packaging bug or >> something else on my side. >> >> >> >> Thanks >> >> Manish >> -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra