The point here is - spark session is not available in executors. So, you
have to use appropriate storage clients.

On Fri, Sep 22, 2017 at 1:44 AM, lucas.g...@gmail.com <lucas.g...@gmail.com>
wrote:

> I'm not sure what you're doing.  But I have in the past used spark to
> consume a manifest file and then execute a .mapPartition on the result like
> this:
>
>
> def map_key_to_event(s3_events_data_lake):
>
>     def _map_key_to_event(event_key_list, s3_client=test_stub):
>         print("Events in list")
>         start = time.time()
>
>         return_list = []
>
>         if s3_client is None:
>             s3_client = boto3.Session().client('s3')
>
>         for event_key in event_key_list:
>           try:
>             response = s3_client.get_object(Bucket=s3_events_data_lake,
> Key=event_key)
>             contents = response['Body'].read().decode('utf-8')
>             entity = json.loads(contents)
>             event_type = json.loads(entity["Message"])["type"]
>             entity["Message"] = json.loads(entity["Message"])
>             # json.dumps here because Spark doesn't have a good json
> datatype.
>             return_list.append((event_type, json.dumps(entity)))
>           except Exception:
>             print("Key: {k} did not yield a valid object:
> {o}".format(k=event_key, o=contents))
>
>         end = time.time()
>         print('time elapsed:')
>         print(end - start)
>
>         return return_list
>
>     return _map_key_to_event
>
>
> pkeys = spark.context.parallelize(full_list_for_time_slice, 32)
> print("partitions: ")
> print(pkeys.getNumPartitions())
> events = pkeys.mapPartitions(map_func)
>
>
>
>
>
> In this case I'm loading heterogeneous json files with wildly different
> schemas, then saving them into parquet file / event type (IE turning one
> big heterogeneous dump into numerous smaller homogenous dumps)
>
> I'm sure this isn't the only or even best way to do it.
>
> The underlying issue is that you're trying to violate the programming
> model.  The model in this case consists of telling the driver what you want
> and then having the executors go do it.
>
> Spark Context is a driver level abstraction, it kind of doesn't make sense
> in the executor context, the executor is acting on behalf of the driver and
> shouldn't need a back reference to it.  You'd end up with some interesting
> execution graphs.
>
> This is a common pattern in spark as far as I can tell.  IE calling a map
> and and then doing something with the items in the executor, either
> computing or enriching.  My case above is a bit weird and I'm not certain
> it's the right mechanism in that I'm literally taking a manifest file and
> turning it into 'n' actual records.
>
> Also, if you're going to be constructing a connection string / jdbc call /
> s3 client... You really don't want to use a straight .map(func).  You'll
> end up instantiating a connection on every iteration.
>
> Hope this is somewhat helpful.
>
> Gary
>
> On 21 September 2017 at 06:31, Weichen Xu <weichen...@databricks.com>
> wrote:
>
>> Spark do not allow executor code using `sparkSession`.
>> But I think you can move all json files to one directory, and them run:
>>
>> ```
>> spark.read.json("/path/to/jsonFileDir")
>> ```
>> But if you want to get filename at the same time, you can use
>> ```
>> spark.sparkContext.wholeTextFiles("/path/to/jsonFileDir")...
>> ```
>>
>> On Thu, Sep 21, 2017 at 9:18 PM, Riccardo Ferrari <ferra...@gmail.com>
>> wrote:
>>
>>> Depends on your use-case however broadcasting
>>> <https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables>
>>> could be a better option.
>>>
>>> On Thu, Sep 21, 2017 at 2:03 PM, Chackravarthy Esakkimuthu <
>>> chaku.mi...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I want to know how to pass sparkSession from driver to executor.
>>>>
>>>> I have a spark program (batch job) which does following,
>>>>
>>>> #################
>>>>
>>>> val spark = SparkSession.builder().appName("SampleJob").config("spark.
>>>> master", "local") .getOrCreate()
>>>>
>>>> val df = this is dataframe which has list of file names (hdfs)
>>>>
>>>> df.foreach { fileName =>
>>>>
>>>>       *spark.read.json(fileName)*
>>>>
>>>>       ...... some logic here....
>>>> }
>>>>
>>>> #################
>>>>
>>>>
>>>> *spark.read.json(fileName) --- this fails as it runs in executor. When
>>>> I put it outside foreach, i.e. in driver, it works.*
>>>>
>>>> As I am trying to use spark (sparkSession) in executor which is not
>>>> visible outside driver. But I want to read hdfs files inside foreach, how
>>>> do I do it.
>>>>
>>>> Can someone help how to do this.
>>>>
>>>> Thanks,
>>>> Chackra
>>>>
>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha

Reply via email to