Re: Monitoring S3 Bucket with Spark Streaming

2016-04-09 Thread Nezih Yigitbasi
Natu, Benjamin,
With this mechanism you can configure notifications for *buckets* (if you
only care about some key prefixes you can take a look at object key name
filtering, see the docs)  for various event types, and then these events
can be published to SNS, SQS or Lambdas. I think using SQS as a source here
will let you process these notifications from your Spark streaming job.

Nezih

On Sat, Apr 9, 2016 at 10:58 AM Natu Lauchande  wrote:

>
> Do you know if textFileStream can see if new files are created underneath
> a whole bucket?
> Only at the level of the folder that you specify . They don't do
> subfolders. So your approach would be detecting everything under path
> s3://bucket/path/2016040902_data.csv
>
>
> Also, will Spark Streaming not pick up these files again on the following
> run knowing that it already picked them up or do we have to store state
> somewhere, like the last run date and time to compare against?
> Yes it does it automatically. It will only pick newly created files ,
> after the streamming app is working .
>
>
> Thanks,
> Natu
>
>
> On Sat, Apr 9, 2016 at 4:44 PM, Benjamin Kim  wrote:
>
>> Natu,
>>
>> Do you know if textFileStream can see if new files are created underneath
>> a whole bucket? For example, if the bucket name is incoming and new files
>> underneath it are 2016/04/09/00/00/01/data.csv and
>> 2016/04/09/00/00/02/data/csv, will these files be picked up? Also, will
>> Spark Streaming not pick up these files again on the following run knowing
>> that it already picked them up or do we have to store state somewhere, like
>> the last run date and time to compare against?
>>
>> Thanks,
>> Ben
>>
>> On Apr 8, 2016, at 9:15 PM, Natu Lauchande  wrote:
>>
>> Hi Benjamin,
>>
>> I have done it . The critical configuration items are the ones below :
>>
>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl",
>> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",
>> AccessKeyId)
>>
>> ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",
>> AWSSecretAccessKey)
>>
>>   val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder
>> ")
>>
>> This code will probe for new S3 files created in your every batch
>> interval.
>>
>> Thanks,
>> Natu
>>
>> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim  wrote:
>>
>>> Has anyone monitored an S3 bucket or directory using Spark Streaming and
>>> pulled any new files to process? If so, can you provide basic Scala coding
>>> help on this?
>>>
>>> Thanks,
>>> Ben
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>


Re: Monitoring S3 Bucket with Spark Streaming

2016-04-09 Thread Nezih Yigitbasi
While it is doable in Spark, S3 also supports notifications:
http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html


On Fri, Apr 8, 2016 at 9:15 PM Natu Lauchande  wrote:

> Hi Benjamin,
>
> I have done it . The critical configuration items are the ones below :
>
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.impl",
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>   ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",
> AccessKeyId)
>
> ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",
> AWSSecretAccessKey)
>
>   val inputS3Stream =  ssc.textFileStream("s3://example_bucket/folder")
>
> This code will probe for new S3 files created in your every batch interval.
>
> Thanks,
> Natu
>
> On Fri, Apr 8, 2016 at 9:14 PM, Benjamin Kim  wrote:
>
>> Has anyone monitored an S3 bucket or directory using Spark Streaming and
>> pulled any new files to process? If so, can you provide basic Scala coding
>> help on this?
>>
>> Thanks,
>> Ben
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Amazon S3 Access Error

2016-04-06 Thread Nezih Yigitbasi
Did you take a look at this
 jira?

On Wed, Apr 6, 2016 at 6:44 PM Joice Joy  wrote:

> I am facing an S3 access error when using Spark 1.6.1 pre-built for Hadoop
> 2.6 or later.
> But if I use Spark 1.6.1 pre-built for Hadoop 2.4 or later, it works.
> Am I missing something that needs to be configured with Hadoop 2.6
>
> PFB the error:
> java.io.IOException: No FileSystem for scheme: s3n
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at
> org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
> at
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
> at
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
> at
> org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
> at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:28)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:33)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:35)
> at $iwC$$iwC$$iwC$$iwC$$iwC.(:37)
> at $iwC$$iwC$$iwC$$iwC.(:39)
> at $iwC$$iwC$$iwC.(:41)
> at $iwC$$iwC.(:43)
> at $iwC.(:45)
> at (:47)
> at .(:51)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
> at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
> at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
> at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
> at
> org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
> at
> org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
> at org.apache.spark.repl.SparkILoop.org
> $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at org.apache.spark.repl.SparkILoop.org
> $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
> at 

Re: SparkContext.stop() takes too long to complete

2016-03-18 Thread Nezih Yigitbasi
Hadoop 2.4.0. Here is the relevant logs from executor 1136

16/03/18 21:26:58 INFO mapred.SparkHadoopMapRedUtil:
attempt_201603182126_0276_m_000484_0: Committed16/03/18 21:26:58 INFO
executor.Executor: Finished task 484.0 in stage 276.0 (TID 59663).
1080 bytes result sent to driver16/03/18 21:38:18 ERROR
executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15:
SIGTERM16/03/18 21:38:18 INFO storage.DiskBlockManager: Shutdown hook
called16/03/18 21:38:18 INFO util.ShutdownHookManager: Shutdown hook
called

On Fri, Mar 18, 2016 at 4:21 PM Ted Yu <yuzhih...@gmail.com> wrote:

Which version of hadoop do you use ?
>
> bq. Requesting to kill executor(s) 1136
>
> Can you find more information on executor 1136 ?
>
> Thanks
>
> On Fri, Mar 18, 2016 at 4:16 PM, Nezih Yigitbasi <
> nyigitb...@netflix.com.invalid> wrote:
>
>> Hi Spark experts,
>> I am using Spark 1.5.2 on YARN with dynamic allocation enabled. I see in
>> the driver/application master logs that the app is marked as SUCCEEDED and
>> then SparkContext stop is called. However, this stop sequence takes > 10
>> minutes to complete, and YARN resource manager kills the application master
>> as it didn’t receive a heartbeat within the last 10 minutes. The resource
>> manager then kills the application master. Any ideas about what may be
>> going on?
>>
>> Here are the relevant logs:
>>
>> *6/03/18 21:26:58 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, 
>> exitCode: 0
>> 16/03/18 21:26:58 INFO spark.SparkContext: Invoking stop() from shutdown 
>> hook*16/03/18 21:26:58 INFO handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/static/sql,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL/execution/json,null}16/03/18 21:26:58 
>> INFO handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL/execution,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/static/sql,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL/execution/json,null}16/03/18 21:26:58 
>> INFO handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL/execution,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/metrics/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/api,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/static,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}16/03/18 
>> 21:26:58 INFO handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/executors/threadDump,null}16/03/18 21:26:58 
>> INFO handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/executors/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/executors,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/environment/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/environment,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/storage/rdd/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/storage/rdd,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/storage/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/storage,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/stages/pool/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/stages/pool,null}16/0

SparkContext.stop() takes too long to complete

2016-03-18 Thread Nezih Yigitbasi
Hi Spark experts,
I am using Spark 1.5.2 on YARN with dynamic allocation enabled. I see in
the driver/application master logs that the app is marked as SUCCEEDED and
then SparkContext stop is called. However, this stop sequence takes > 10
minutes to complete, and YARN resource manager kills the application master
as it didn’t receive a heartbeat within the last 10 minutes. The resource
manager then kills the application master. Any ideas about what may be
going on?

Here are the relevant logs:

*6/03/18 21:26:58 INFO yarn.ApplicationMaster: Final app status:
SUCCEEDED, exitCode: 0
16/03/18 21:26:58 INFO spark.SparkContext: Invoking stop() from
shutdown hook*16/03/18 21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/static/sql,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/execution/json,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/execution,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/json,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/static/sql,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/execution/json,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/execution,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/json,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/api,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/static,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/threadDump,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/environment/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/environment,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/rdd/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/rdd,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/pool/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/pool,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage/json,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/json,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/job/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/job,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/json,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs,null}16/03/18 21:26:58 INFO
ui.SparkUI: Stopped Spark web UI at
http://10.143.240.240:5270616/03/18 21:27:58 INFO
cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s)
113516/03/18 21:27:58 INFO yarn.YarnAllocator: Driver requested a
total number of 208 executor(s).16/03/18 21:27:58 INFO
yarn.ApplicationMaster$AMEndpoint: Driver requested to kill
executor(s) 1135.16/03/18 21:27:58 INFO
spark.ExecutorAllocationManager: Removing executor 1135 because it has
been idle for 60 seconds (new desired total will be 208)16/03/18
21:27:58 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill
executor(s) 112316/03/18 21:27:58 INFO yarn.YarnAllocator: Driver
requested a total number of 207 executor(s).16/03/18 

Re: question about combining small parquet files

2015-11-30 Thread Nezih Yigitbasi
This looks interesting, thanks Ruslan. But, compaction with Hive is as
simple as an insert overwrite statement as Hive
supports CombineFileInputFormat, is it possible to do the same with Spark?

On Thu, Nov 26, 2015 at 9:47 AM, Ruslan Dautkhanov <dautkha...@gmail.com>
wrote:

> An interesting compaction approach of small files is discussed recently
>
> http://blog.cloudera.com/blog/2015/11/how-to-ingest-and-query-fast-data-with-impala-without-kudu/
>
>
> AFAIK Spark supports views too.
>
>
> --
> Ruslan Dautkhanov
>
> On Thu, Nov 26, 2015 at 10:43 AM, Nezih Yigitbasi <
> nyigitb...@netflix.com.invalid> wrote:
>
>> Hi Spark people,
>> I have a Hive table that has a lot of small parquet files and I am
>> creating a data frame out of it to do some processing, but since I have a
>> large number of splits/files my job creates a lot of tasks, which I don't
>> want. Basically what I want is the same functionality that Hive provides,
>> that is, to combine these small input splits into larger ones by specifying
>> a max split size setting. Is this currently possible with Spark?
>>
>> I look at coalesce() but with coalesce I can only control the number
>> of output files not their sizes. And since the total input dataset size
>> can vary significantly in my case, I cannot just use a fixed partition
>> count as the size of each output file can get very large. I then looked for
>> getting the total input size from an rdd to come up with some heuristic to
>> set the partition count, but I couldn't find any ways to do it (without
>> modifying the spark source).
>>
>> Any help is appreciated.
>>
>> Thanks,
>> Nezih
>>
>> PS: this email is the same as my previous email as I learned that my
>> previous email ended up as spam for many people since I sent it through
>> nabble, sorry for the double post.
>>
>
>


question about combining small parquet files

2015-11-26 Thread Nezih Yigitbasi
Hi Spark people,
I have a Hive table that has a lot of small parquet files and I am
creating a data frame out of it to do some processing, but since I have a
large number of splits/files my job creates a lot of tasks, which I don't
want. Basically what I want is the same functionality that Hive provides,
that is, to combine these small input splits into larger ones by specifying
a max split size setting. Is this currently possible with Spark?

I look at coalesce() but with coalesce I can only control the number
of output files not their sizes. And since the total input dataset size
can vary significantly in my case, I cannot just use a fixed partition
count as the size of each output file can get very large. I then looked for
getting the total input size from an rdd to come up with some heuristic to
set the partition count, but I couldn't find any ways to do it (without
modifying the spark source).

Any help is appreciated.

Thanks,
Nezih

PS: this email is the same as my previous email as I learned that my
previous email ended up as spam for many people since I sent it through
nabble, sorry for the double post.