HFiles are sorted files, you can't just append to them. Can you explain your usecase? Hfiles are mainly used for bulk loading, can't you just have several files and bulk load them in order?
On Sat, 14 Mar 2020, 19:23 Gautham Acharya, <[email protected]> wrote: > I have a process in Apache Spark that attempts to write HFiles to S3 in a > batched process. I want the resulting HFiles in the same directory, as they > are in the same column family. However, I'm getting a 'directory already > exists error' when I try to run this on AWS EMR. How can I write Hfiles via > Spark as an 'append', like I can do via a CSV? > > The batch writing function looks like this: > > for col_group in split_cols: > processed_chunk = > batch_write_pandas_udf_for_col_aggregation(joined_dataframe, col_group, > pandas_udf_func, group_by_args) > > hfile_writer.write_hfiles(processed_chunk, output_path, > zookeeper_ip, table_name, > constants.DEFAULT_COL_FAMILY) > > The actual function to write the Hfiles is this: > > rdd.saveAsNewAPIHadoopFile(output_path, > constants.OUTPUT_FORMAT_CLASS, > keyClass=constants.KEY_CLASS, > valueClass=constants.VALUE_CLASS, > keyConverter=constants.KEY_CONVERTER, > valueConverter=constants.VALUE_CONVERTER, > conf=conf) > > The exception I'm getting: > > > Called with arguments: Namespace(job_args=['matrix_path=/tmp/matrix.csv', > 'metadata_path=/tmp/metadata.csv', > 'output_path=s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles', > 'group_by_args=cluster_id', 'zookeeper_ip=ip-172-30-5-36.ec2.internal', > 'table_name=test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a'], > job_name='matrix_transformations') > > job_args_tuples: [['matrix_path', '/tmp/matrix.csv'], ['metadata_path', > '/tmp/metadata.csv'], ['output_path', > 's3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'], > ['group_by_args', 'cluster_id'], ['zookeeper_ip', > 'ip-172-30-5-36.ec2.internal'], ['table_name', > 'test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a']] > > Traceback (most recent call last): > > File "/mnt/var/lib/hadoop/steps/s-2ZIOR335HH9TR/main.py", line 56, in > <module> > > job_module.transform(spark, **job_args) > > File > "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", > line 93, in transform > > File > "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", > line 73, in write_split_columnwise_transform > > File > "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/output_handler/hfile_writer.py", > line 44, in write_hfiles > > File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, > in saveAsNewAPIHadoopFile > > File > "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line > 1257, in __call__ > > File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line > 63, in deco > > File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", > line 328, in get_return_value > > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. > > : org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory > s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles/median > already exists > > at > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146) > > at org.apache.spark.internal.io > .HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:393) > > at org.apache.spark.internal.io > .SparkHadoopWriter$.write(SparkHadoopWriter.scala:71) > > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083) > > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081) > > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081) > > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > > at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) > > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081) > > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1000) > > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991) > > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991) > > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > > at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) > > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:991) > > at > org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:584) > > at > org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > at > py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > > at py4j.Gateway.invoke(Gateway.java:282) > > at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > at py4j.commands.CallCommand.execute(CallCommand.java:79) > > at py4j.GatewayConnection.run(GatewayConnection.java:238) > > at java.lang.Thread.run(Thread.java:748) > >
