The point here is - spark session is not available in executors. So, you have to use appropriate storage clients.
On Fri, Sep 22, 2017 at 1:44 AM, lucas.g...@gmail.com <lucas.g...@gmail.com> wrote: > I'm not sure what you're doing. But I have in the past used spark to > consume a manifest file and then execute a .mapPartition on the result like > this: > > > def map_key_to_event(s3_events_data_lake): > > def _map_key_to_event(event_key_list, s3_client=test_stub): > print("Events in list") > start = time.time() > > return_list = [] > > if s3_client is None: > s3_client = boto3.Session().client('s3') > > for event_key in event_key_list: > try: > response = s3_client.get_object(Bucket=s3_events_data_lake, > Key=event_key) > contents = response['Body'].read().decode('utf-8') > entity = json.loads(contents) > event_type = json.loads(entity["Message"])["type"] > entity["Message"] = json.loads(entity["Message"]) > # json.dumps here because Spark doesn't have a good json > datatype. > return_list.append((event_type, json.dumps(entity))) > except Exception: > print("Key: {k} did not yield a valid object: > {o}".format(k=event_key, o=contents)) > > end = time.time() > print('time elapsed:') > print(end - start) > > return return_list > > return _map_key_to_event > > > pkeys = spark.context.parallelize(full_list_for_time_slice, 32) > print("partitions: ") > print(pkeys.getNumPartitions()) > events = pkeys.mapPartitions(map_func) > > > > > > In this case I'm loading heterogeneous json files with wildly different > schemas, then saving them into parquet file / event type (IE turning one > big heterogeneous dump into numerous smaller homogenous dumps) > > I'm sure this isn't the only or even best way to do it. > > The underlying issue is that you're trying to violate the programming > model. The model in this case consists of telling the driver what you want > and then having the executors go do it. > > Spark Context is a driver level abstraction, it kind of doesn't make sense > in the executor context, the executor is acting on behalf of the driver and > shouldn't need a back reference to it. You'd end up with some interesting > execution graphs. > > This is a common pattern in spark as far as I can tell. IE calling a map > and and then doing something with the items in the executor, either > computing or enriching. My case above is a bit weird and I'm not certain > it's the right mechanism in that I'm literally taking a manifest file and > turning it into 'n' actual records. > > Also, if you're going to be constructing a connection string / jdbc call / > s3 client... You really don't want to use a straight .map(func). You'll > end up instantiating a connection on every iteration. > > Hope this is somewhat helpful. > > Gary > > On 21 September 2017 at 06:31, Weichen Xu <weichen...@databricks.com> > wrote: > >> Spark do not allow executor code using `sparkSession`. >> But I think you can move all json files to one directory, and them run: >> >> ``` >> spark.read.json("/path/to/jsonFileDir") >> ``` >> But if you want to get filename at the same time, you can use >> ``` >> spark.sparkContext.wholeTextFiles("/path/to/jsonFileDir")... >> ``` >> >> On Thu, Sep 21, 2017 at 9:18 PM, Riccardo Ferrari <ferra...@gmail.com> >> wrote: >> >>> Depends on your use-case however broadcasting >>> <https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables> >>> could be a better option. >>> >>> On Thu, Sep 21, 2017 at 2:03 PM, Chackravarthy Esakkimuthu < >>> chaku.mi...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I want to know how to pass sparkSession from driver to executor. >>>> >>>> I have a spark program (batch job) which does following, >>>> >>>> ################# >>>> >>>> val spark = SparkSession.builder().appName("SampleJob").config("spark. >>>> master", "local") .getOrCreate() >>>> >>>> val df = this is dataframe which has list of file names (hdfs) >>>> >>>> df.foreach { fileName => >>>> >>>> *spark.read.json(fileName)* >>>> >>>> ...... some logic here.... >>>> } >>>> >>>> ################# >>>> >>>> >>>> *spark.read.json(fileName) --- this fails as it runs in executor. When >>>> I put it outside foreach, i.e. in driver, it works.* >>>> >>>> As I am trying to use spark (sparkSession) in executor which is not >>>> visible outside driver. But I want to read hdfs files inside foreach, how >>>> do I do it. >>>> >>>> Can someone help how to do this. >>>> >>>> Thanks, >>>> Chackra >>>> >>> >>> >> > -- Best Regards, Ayan Guha