Posted in Mailing list too.
My process generates at most 150 files. As I said it takes more time (to
move files from temp folder to target path) for table with many partitions
compared to table with less partitions. Not sure what's the reason behind
such behavior.
I tried with writing files directly to s3 and then add partitions to hive
table. But, spark job doesn't save dataframe with null values. I get
IllegalArgument exception stating - found `null` instead of .
On Mon, Nov 5, 2018 at 2:41 AM Jörn Franke wrote:
> Can you share it with the mailing list?
>
> I believe it would be more efficient to work in Spark just at the file
> level (without using Hive) and at the end let Hive discover the new files
> via MSCK repair.
> It could be that your process generates a lot of small files and this is
> very inefficient on Hadoop (try to have larger partitions at least 128M
> size)
>
> Am 05.11.2018 um 08:58 schrieb Bhaskar Ebbur :
>
> Here's code with correct data frame.
>
> self.session = SparkSession \
> .builder \
> .appName(self.app_name) \
> .config("spark.dynamicAllocation.enabled", "false") \
> .config("hive.exec.dynamic.partition.mode", "nonstrict") \
> .config("mapreduce.fileoutputcommitter.algorithm.version",
> "2") \
> .config("hive.load.dynamic.partitions.thread", "10") \
> .config("hive.mv.files.thread", "30") \
> .config("fs.trash.interval", "0") \
> .enableHiveSupport()
>
> columns_with_default = "col1, NULL as col2, col2, col4, NULL as col5,
> partition_col1, partition_col2"
> source_data_df_to_write = self.session.sql(
> "SELECT %s FROM TEMP_VIEW" % (columns_with_default))
>
> source_data_df_to_write\
> .coalesce(50)\
> .createOrReplaceTempView("TEMP_VIEW")
>
> table_name_abs = "%s.%s" % (self.database, self.target_table)
> self.session.sql(
> "INSERT OVERWRITE TABLE %s "
> "PARTITION (%s) "
> "SELECT %s FROM TEMP_VIEW" % (
> table_name_abs, "partition_col1, partition_col2",
> columns_with_default))
>
>
>
>
> On Sun, Nov 4, 2018 at 11:30 PM Bhaskar Ebbur wrote:
>
>> Here's some sample code.
>>
>> self.session = SparkSession \
>> .builder \
>> .appName(self.app_name) \
>> .config("spark.dynamicAllocation.enabled", "false") \
>> .config("hive.exec.dynamic.partition.mode", "nonstrict") \
>> .config("mapreduce.fileoutputcommitter.algorithm.version",
>> "2") \
>> .config("hive.load.dynamic.partitions.thread", "10") \
>> .config("hive.mv.files.thread", "30") \
>> .config("fs.trash.interval", "0") \
>> .enableHiveSupport()
>>
>> columns_with_default = "col1, NULL as col2, col2, col4, NULL as col5,
>> partition_col1, partition_col2"
>> source_data_df_to_write = self.session.sql(
>> "SELECT %s, %s, %s as %s, %s as %s FROM TEMP_VIEW" %
>> (columns_with_default))
>> source_data_df_to_write\
>> .coalesce(50)\
>> .createOrReplaceTempView("TEMP_VIEW")
>>
>> table_name_abs = "%s.%s" % (self.database, self.target_table)
>> self.session.sql(
>> "INSERT OVERWRITE TABLE %s "
>> "PARTITION (%s) "
>> "SELECT %s FROM TEMP_VIEW" % (
>> table_name_abs, "partition_col1, partition_col2",
>> columns_with_default))
>>
>>
>> On Sun, Nov 4, 2018 at 11:08 PM Jörn Franke wrote:
>>
>>> Can you share some relevant source code?
>>>
>>>
>>> > Am 05.11.2018 um 07:58 schrieb ehbhaskar :
>>> >
>>> > I have a pyspark job that inserts data into hive partitioned table
>>> using
>>> > `Insert Overwrite` statement.
>>> >
>>> > Spark job loads data quickly (in 15 mins) to temp directory
>>> (~/.hive-***) in
>>> > S3. But, it's very slow in moving data from temp directory to the
>>> target
>>> > path, it takes more than 40 mins to move data from temp to target path.
>>> >
>>> > I set the option mapreduce.fileoutputcommitter.algorithm.