[ 
https://issues.apache.org/jira/browse/SPARK-47156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marc Le Bihan updated SPARK-47156:
----------------------------------
    Description: 
I need first to know if I'm in front of a bug or not.

If it's the case, I'll manage to create a test to help you reproduce the case, 
but if it isn't, maybe Spark documentation could explain when 
{{sparkSession.getContext()}} can return {{{}null{}}}.

 

I'm willing to ease my development by separating :
 * parquet files management \{ checking existence, then loading them as cache, 
or saving data to them },
 * from dataset creation, when it doesn't exist yet, and should be constituted 
from scratch.

 

The method I'm using is this one:
{code:java}
{code:Java}
protected Dataset<Row> constitutionStandard(OptionsCreationLecture 
optionsCreationLecture,
   Supplier<Dataset<Row>> worker, CacheParqueteur<? extends TriParqueteurIF> 
cacheParqueteur) {
   OptionsCreationLecture options = optionsCreationLecture != null ? 
optionsCreationLecture : optionsCreationLecture();

   Dataset<Row> dataset = cacheParqueteur.call(options.useCache());
   return dataset == null ? 
cacheParqueteur.save(cacheParqueteur.appliquer(worker.get())) : dataset;
}
{code}
In case the dataset doesn't exist in parquet files (= cache) yet, it starts its 
creation by calling a {{worker.get()}} that is a {{Supplier}} of 
{{{}Dataset<Row>{}}}.

 

A concrete usage is this one:
{code:java}
{code:Java}
public Dataset<Row> rowEtablissements(OptionsCreationLecture 
optionsCreationLecture, HistoriqueExecution historiqueExecution, int anneeCOG, 
int anneeSIRENE, boolean actifsSeulement, boolean communesValides, boolean 
nomenclaturesNAF2Valides) {
   OptionsCreationLecture options = optionsCreationLecture != null ? 
optionsCreationLecture : optionsCreationLecture();
   Supplier<Dataset<Row>> worker = () -> {
      super.setStageDescription(this.messageSource, 
"row.etablissements.libelle.long", "row.etablissements.libelle.court", 
anneeSIRENE, anneeCOG, actifsSeulement, communesValides, 
nomenclaturesNAF2Valides);
      
      Map<String, Integer> indexs = new HashMap<>();
      Dataset<Row> etablissements = 
etablissementsNonFiltres(optionsCreationLecture, anneeSIRENE);

      etablissements = etablissements.filter(
         (FilterFunction<Row>)etablissement -> 
this.validator.validationEtablissement(this.session, historiqueExecution, 
etablissement, actifsSeulement, nomenclaturesNAF2Valides, indexs));

      // Si le filtrage par communes valides a été demandé, l'appliquer.
      if (communesValides) {
         etablissements = rowRestreindreAuxCommunesValides(etablissements, 
anneeCOG, anneeSIRENE, indexs);
      }
      else {
         etablissements = etablissements.withColumn("codeDepartement", 
substring(CODE_COMMUNE.col(), 1, 2));
      }

      // Associer les libellés des codes APE/NAF.
      Dataset<Row> nomenclatureNAF = 
this.nafDataset.rowNomenclatureNAF(anneeSIRENE);
      etablissements = etablissements.join(nomenclatureNAF, 
etablissements.col("activitePrincipale").equalTo(nomenclatureNAF.col("codeNAF"))
 , "left_outer")
         .drop("codeNAF", "niveauNAF");

      // Le dataset est maintenant considéré comme valide, et ses champs 
peuvent être castés dans leurs types définitifs.
      return this.validator.cast(etablissements);
   };

   return constitutionStandard(options, () -> worker.get()
      .withColumn("partitionSiren", SIREN_ENTREPRISE.col().substr(1,2)),
      new CacheParqueteur<>(options, this.session,
         "etablissements", 
"annee_{0,number,#0}-actifs_{1}-communes_verifiees_{2}-nafs_verifies_{3}", 
DEPARTEMENT_SIREN_SIRET,
         anneeSIRENE, anneeCOG, actifsSeulement, communesValides));
} {code}
 

In the worker, a filter calls a {{validationEtablissement(SparkSession, 
HistoriqueExecution, Row, ...)}} on each row to perform complete checking 
(eight rules to check for an establishment validity).

When a check fails, along with a warning log, I'm also counting in 
{{historiqueExecution}} object the number of problems of that kind I've 
encountered.

That function increase a {{longAccumulator}} value, and create that accumulator 
first, that it stores in a {{{}Map<String, LongAccumulator> accumulators{}}},  
if needed.
{code:java}
{code:Java}
public void incrementerOccurrences(SparkSession session, String 
codeOuFormatMessage, boolean creerSiAbsent) {
   LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);

   if (accumulator == null && creerSiAbsent) {
      accumulator = session.sparkContext().longAccumulator(codeOuFormatMessage);
      accumulators.put(codeOuFormatMessage, accumulator);
   }

   if (accumulator != null) {
      accumulator.add(1);
   }
}
{code}
 

Or at least, it should. But my problem is that it isn't the case.

During Dataset constitution :

*1)* If I initialize the {{historiqueExecution}} variable with the exhaustive 
list of messages it can have to count, +*before*+ the {{worker.get()}} is 
called by the {{constitutionStandard}} method, the dataset is perfectly 
constituted and I can dump my counters :
{code:java}
{code:Java}
historiqueExecution.setCodesMessages(this.session,
   "etablissement sans SIRET, peut être étranger : '{}'",
   "etablissement au SIRET '{}' invalide, écarté.",
   "etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET 
vaut : {}), écarté.",
   "etablissement au SIREN d'entreprise '{}' invalide, écarté.",
   "établissement de SIRET {} écarté : il n'a pas de nomemclature APE.",
   "établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue.",
   "établissement de SIRET {} écarté : il n'a pas de code APE.",
   "établissement de SIRET {} écarté : son code APE {} est invalide.",
   "établissement de SIRET {} écarté : son type de voie d'adresse principale, 
{}, est invalide",
   "établissement de SIRET {} écarté : son type de voie d'adresse secondaire, 
{}, est invalide",
   "établissement de SIRET {} écarté : sa date de création, {}, est invalide",
   "établissement de SIRET {} écarté : sa date de dernier traitement, {}, est 
invalide : {}",
   "établissement de SIRET {} écarté : sa date d'historisation, {}, est 
invalide");// code placeholder
{code}
{noformat}
etablissement au SIRET '{}' invalide, écarté. : 0
établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue. : 0
etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET 
vaut : {}), écarté. : 0
etablissement sans SIRET, peut être étranger : '{}' : 0
établissement de SIRET {} écarté : son code APE {} est invalide. : 0
établissement de SIRET {} écarté : il n'a pas de code APE. : 0
établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide : 0
établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, 
est invalide : 2
établissement de SIRET {} écarté : sa date de dernier traitement, {}, est 
invalide : {} : 0
etablissement au SIREN d'entreprise '{}' invalide, écarté. : 0
établissement de SIRET {} écarté : il n'a pas de nomemclature APE. : 0
établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, 
est invalide : 0
établissement de SIRET {} écarté : sa date de création, {}, est invalide : 
0{noformat}
*2)* But if I don't initialize that list, and that I leave the creation of the 
missing {{longAccumulator}} to the worker itself, at runtime (filter time), 
then the :

{{session.sparkContext().longAccumulator(codeOuFormatMessage);}}

fails on a {{{}NullPointerException{}}}, as {{sparkContext()}} returns null.

 

My first question is : is it normal that, when taking the real actions to build 
the dataset, the spark session returns a {{null}} context?

 

  was:
I need first to know if I'm in front of a bug or not.

If it's the case, I manage to create a test to help you reproduce the case, but 
if it isn't, maybe Spark documentation could explain when 
{{sparkSession.getContext()}} can return {{{}null{}}}.

 

I'm willing to ease my development by separating :
 * parquet files management \{ checking existence, then loading them as cache, 
or saving data to them },
 * from dataset creation, when it doesn't exist yet, and should be constituted 
from scratch.

 

The method I'm using is this one:

 
{code:java}
{code:Java}
protected Dataset<Row> constitutionStandard(OptionsCreationLecture 
optionsCreationLecture,
   Supplier<Dataset<Row>> worker, CacheParqueteur<? extends TriParqueteurIF> 
cacheParqueteur) {
   OptionsCreationLecture options = optionsCreationLecture != null ? 
optionsCreationLecture : optionsCreationLecture();

   Dataset<Row> dataset = cacheParqueteur.call(options.useCache());
   return dataset == null ? 
cacheParqueteur.save(cacheParqueteur.appliquer(worker.get())) : dataset;
}
{code}
In case the dataset doesn't exist in parquet files (= cache) yet, it starts its 
creation by calling a {{worker.get()}} that is a {{Supplier}} of 
{{{}Dataset<Row>{}}}.

 

 

A concrete usage is this one:

 

 
{code:java}
{code:Java}
public Dataset<Row> rowEtablissements(OptionsCreationLecture 
optionsCreationLecture, HistoriqueExecution historiqueExecution, int anneeCOG, 
int anneeSIRENE, boolean actifsSeulement, boolean communesValides, boolean 
nomenclaturesNAF2Valides) {
   OptionsCreationLecture options = optionsCreationLecture != null ? 
optionsCreationLecture : optionsCreationLecture();
   Supplier<Dataset<Row>> worker = () -> {
      super.setStageDescription(this.messageSource, 
"row.etablissements.libelle.long", "row.etablissements.libelle.court", 
anneeSIRENE, anneeCOG, actifsSeulement, communesValides, 
nomenclaturesNAF2Valides);
      
      Map<String, Integer> indexs = new HashMap<>();
      Dataset<Row> etablissements = 
etablissementsNonFiltres(optionsCreationLecture, anneeSIRENE);

      etablissements = etablissements.filter(
         (FilterFunction<Row>)etablissement -> 
this.validator.validationEtablissement(this.session, historiqueExecution, 
etablissement, actifsSeulement, nomenclaturesNAF2Valides, indexs));

      // Si le filtrage par communes valides a été demandé, l'appliquer.
      if (communesValides) {
         etablissements = rowRestreindreAuxCommunesValides(etablissements, 
anneeCOG, anneeSIRENE, indexs);
      }
      else {
         etablissements = etablissements.withColumn("codeDepartement", 
substring(CODE_COMMUNE.col(), 1, 2));
      }

      // Associer les libellés des codes APE/NAF.
      Dataset<Row> nomenclatureNAF = 
this.nafDataset.rowNomenclatureNAF(anneeSIRENE);
      etablissements = etablissements.join(nomenclatureNAF, 
etablissements.col("activitePrincipale").equalTo(nomenclatureNAF.col("codeNAF"))
 , "left_outer")
         .drop("codeNAF", "niveauNAF");

      // Le dataset est maintenant considéré comme valide, et ses champs 
peuvent être castés dans leurs types définitifs.
      return this.validator.cast(etablissements);
   };

   return constitutionStandard(options, () -> worker.get()
      .withColumn("partitionSiren", SIREN_ENTREPRISE.col().substr(1,2)),
      new CacheParqueteur<>(options, this.session,
         "etablissements", 
"annee_{0,number,#0}-actifs_{1}-communes_verifiees_{2}-nafs_verifies_{3}", 
DEPARTEMENT_SIREN_SIRET,
         anneeSIRENE, anneeCOG, actifsSeulement, communesValides));
} {code}
 

In the worker, a filter calls a {{validationEtablissement(SparkSession, 
HistoriqueExecution, Row, ...)}} on each row to perform complete checking 
(eight rules to check for an establishment validity).

 

When a check fails, along with a warning log, I'm also counting in 
{{historiqueExecution}} object the number of problems of that kind I've 
encountered.

That function increase a {{longAccumulator}} value, and create that accumulator 
first, that it stores in a {{{}Map<String, LongAccumulator> accumulators{}}},  
if needed. 
{code:java}
{code:Java}
public void incrementerOccurrences(SparkSession session, String 
codeOuFormatMessage, boolean creerSiAbsent) {
   LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);

   if (accumulator == null && creerSiAbsent) {
      accumulator = session.sparkContext().longAccumulator(codeOuFormatMessage);
      accumulators.put(codeOuFormatMessage, accumulator);
   }

   if (accumulator != null) {
      accumulator.add(1);
   }
}
{code}
 

 

Or at least, it should. But my problem is that it isn't the case.

During Dataset constitution :

 

*1)* If I initialize the {{historiqueExecution}} variable with the exhaustive 
list of messages it can have to count, +*before*+ the {{worker.get()}} is 
called by the {{constitutionStandard}} method, the dataset is perfectly 
constituted and I can dump my counters :
{code:java}
{code:Java}
historiqueExecution.setCodesMessages(this.session,
   "etablissement sans SIRET, peut être étranger : '{}'",
   "etablissement au SIRET '{}' invalide, écarté.",
   "etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET 
vaut : {}), écarté.",
   "etablissement au SIREN d'entreprise '{}' invalide, écarté.",
   "établissement de SIRET {} écarté : il n'a pas de nomemclature APE.",
   "établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue.",
   "établissement de SIRET {} écarté : il n'a pas de code APE.",
   "établissement de SIRET {} écarté : son code APE {} est invalide.",
   "établissement de SIRET {} écarté : son type de voie d'adresse principale, 
{}, est invalide",
   "établissement de SIRET {} écarté : son type de voie d'adresse secondaire, 
{}, est invalide",
   "établissement de SIRET {} écarté : sa date de création, {}, est invalide",
   "établissement de SIRET {} écarté : sa date de dernier traitement, {}, est 
invalide : {}",
   "établissement de SIRET {} écarté : sa date d'historisation, {}, est 
invalide");// code placeholder
{code}



{noformat}
etablissement au SIRET '{}' invalide, écarté. : 0
établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue. : 0
etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET 
vaut : {}), écarté. : 0
etablissement sans SIRET, peut être étranger : '{}' : 0
établissement de SIRET {} écarté : son code APE {} est invalide. : 0
établissement de SIRET {} écarté : il n'a pas de code APE. : 0
établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide : 0
établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, 
est invalide : 2
établissement de SIRET {} écarté : sa date de dernier traitement, {}, est 
invalide : {} : 0
etablissement au SIREN d'entreprise '{}' invalide, écarté. : 0
établissement de SIRET {} écarté : il n'a pas de nomemclature APE. : 0
établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, 
est invalide : 0
établissement de SIRET {} écarté : sa date de création, {}, est invalide : 
0{noformat}
*2)* But if I don't initialize that list, and that I leave the creation of the 
missing {{longAccumulator}} to the worker itself, at runtime (filter time), 
then the :

{{session.sparkContext().longAccumulator(codeOuFormatMessage);}}



fails on a {{{}NullPointerException{}}}, as {{sparkContext()}} returns null.

 

My first question is : is it normal that, when taking the real actions to build 
the dataset, the spark session returns a {{null}} context?

 


> SparkSession returns a null context during a dataset creation
> -------------------------------------------------------------
>
>                 Key: SPARK-47156
>                 URL: https://issues.apache.org/jira/browse/SPARK-47156
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.4.2
>         Environment: Debian 12
> Java 17
>            Reporter: Marc Le Bihan
>            Priority: Major
>
> I need first to know if I'm in front of a bug or not.
> If it's the case, I'll manage to create a test to help you reproduce the 
> case, but if it isn't, maybe Spark documentation could explain when 
> {{sparkSession.getContext()}} can return {{{}null{}}}.
>  
> I'm willing to ease my development by separating :
>  * parquet files management \{ checking existence, then loading them as 
> cache, or saving data to them },
>  * from dataset creation, when it doesn't exist yet, and should be 
> constituted from scratch.
>  
> The method I'm using is this one:
> {code:java}
> {code:Java}
> protected Dataset<Row> constitutionStandard(OptionsCreationLecture 
> optionsCreationLecture,
>    Supplier<Dataset<Row>> worker, CacheParqueteur<? extends TriParqueteurIF> 
> cacheParqueteur) {
>    OptionsCreationLecture options = optionsCreationLecture != null ? 
> optionsCreationLecture : optionsCreationLecture();
>    Dataset<Row> dataset = cacheParqueteur.call(options.useCache());
>    return dataset == null ? 
> cacheParqueteur.save(cacheParqueteur.appliquer(worker.get())) : dataset;
> }
> {code}
> In case the dataset doesn't exist in parquet files (= cache) yet, it starts 
> its creation by calling a {{worker.get()}} that is a {{Supplier}} of 
> {{{}Dataset<Row>{}}}.
>  
> A concrete usage is this one:
> {code:java}
> {code:Java}
> public Dataset<Row> rowEtablissements(OptionsCreationLecture 
> optionsCreationLecture, HistoriqueExecution historiqueExecution, int 
> anneeCOG, int anneeSIRENE, boolean actifsSeulement, boolean communesValides, 
> boolean nomenclaturesNAF2Valides) {
>    OptionsCreationLecture options = optionsCreationLecture != null ? 
> optionsCreationLecture : optionsCreationLecture();
>    Supplier<Dataset<Row>> worker = () -> {
>       super.setStageDescription(this.messageSource, 
> "row.etablissements.libelle.long", "row.etablissements.libelle.court", 
> anneeSIRENE, anneeCOG, actifsSeulement, communesValides, 
> nomenclaturesNAF2Valides);
>       
>       Map<String, Integer> indexs = new HashMap<>();
>       Dataset<Row> etablissements = 
> etablissementsNonFiltres(optionsCreationLecture, anneeSIRENE);
>       etablissements = etablissements.filter(
>          (FilterFunction<Row>)etablissement -> 
> this.validator.validationEtablissement(this.session, historiqueExecution, 
> etablissement, actifsSeulement, nomenclaturesNAF2Valides, indexs));
>       // Si le filtrage par communes valides a été demandé, l'appliquer.
>       if (communesValides) {
>          etablissements = rowRestreindreAuxCommunesValides(etablissements, 
> anneeCOG, anneeSIRENE, indexs);
>       }
>       else {
>          etablissements = etablissements.withColumn("codeDepartement", 
> substring(CODE_COMMUNE.col(), 1, 2));
>       }
>       // Associer les libellés des codes APE/NAF.
>       Dataset<Row> nomenclatureNAF = 
> this.nafDataset.rowNomenclatureNAF(anneeSIRENE);
>       etablissements = etablissements.join(nomenclatureNAF, 
> etablissements.col("activitePrincipale").equalTo(nomenclatureNAF.col("codeNAF"))
>  , "left_outer")
>          .drop("codeNAF", "niveauNAF");
>       // Le dataset est maintenant considéré comme valide, et ses champs 
> peuvent être castés dans leurs types définitifs.
>       return this.validator.cast(etablissements);
>    };
>    return constitutionStandard(options, () -> worker.get()
>       .withColumn("partitionSiren", SIREN_ENTREPRISE.col().substr(1,2)),
>       new CacheParqueteur<>(options, this.session,
>          "etablissements", 
> "annee_{0,number,#0}-actifs_{1}-communes_verifiees_{2}-nafs_verifies_{3}", 
> DEPARTEMENT_SIREN_SIRET,
>          anneeSIRENE, anneeCOG, actifsSeulement, communesValides));
> } {code}
>  
> In the worker, a filter calls a {{validationEtablissement(SparkSession, 
> HistoriqueExecution, Row, ...)}} on each row to perform complete checking 
> (eight rules to check for an establishment validity).
> When a check fails, along with a warning log, I'm also counting in 
> {{historiqueExecution}} object the number of problems of that kind I've 
> encountered.
> That function increase a {{longAccumulator}} value, and create that 
> accumulator first, that it stores in a {{{}Map<String, LongAccumulator> 
> accumulators{}}},  if needed.
> {code:java}
> {code:Java}
> public void incrementerOccurrences(SparkSession session, String 
> codeOuFormatMessage, boolean creerSiAbsent) {
>    LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);
>    if (accumulator == null && creerSiAbsent) {
>       accumulator = 
> session.sparkContext().longAccumulator(codeOuFormatMessage);
>       accumulators.put(codeOuFormatMessage, accumulator);
>    }
>    if (accumulator != null) {
>       accumulator.add(1);
>    }
> }
> {code}
>  
> Or at least, it should. But my problem is that it isn't the case.
> During Dataset constitution :
> *1)* If I initialize the {{historiqueExecution}} variable with the exhaustive 
> list of messages it can have to count, +*before*+ the {{worker.get()}} is 
> called by the {{constitutionStandard}} method, the dataset is perfectly 
> constituted and I can dump my counters :
> {code:java}
> {code:Java}
> historiqueExecution.setCodesMessages(this.session,
>    "etablissement sans SIRET, peut être étranger : '{}'",
>    "etablissement au SIRET '{}' invalide, écarté.",
>    "etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son 
> SIRET vaut : {}), écarté.",
>    "etablissement au SIREN d'entreprise '{}' invalide, écarté.",
>    "établissement de SIRET {} écarté : il n'a pas de nomemclature APE.",
>    "établissement de SIRET {} écarté : sa nomemclature {} n'est plus 
> soutenue.",
>    "établissement de SIRET {} écarté : il n'a pas de code APE.",
>    "établissement de SIRET {} écarté : son code APE {} est invalide.",
>    "établissement de SIRET {} écarté : son type de voie d'adresse principale, 
> {}, est invalide",
>    "établissement de SIRET {} écarté : son type de voie d'adresse secondaire, 
> {}, est invalide",
>    "établissement de SIRET {} écarté : sa date de création, {}, est invalide",
>    "établissement de SIRET {} écarté : sa date de dernier traitement, {}, est 
> invalide : {}",
>    "établissement de SIRET {} écarté : sa date d'historisation, {}, est 
> invalide");// code placeholder
> {code}
> {noformat}
> etablissement au SIRET '{}' invalide, écarté. : 0
> établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue. : 0
> etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET 
> vaut : {}), écarté. : 0
> etablissement sans SIRET, peut être étranger : '{}' : 0
> établissement de SIRET {} écarté : son code APE {} est invalide. : 0
> établissement de SIRET {} écarté : il n'a pas de code APE. : 0
> établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide 
> : 0
> établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, 
> est invalide : 2
> établissement de SIRET {} écarté : sa date de dernier traitement, {}, est 
> invalide : {} : 0
> etablissement au SIREN d'entreprise '{}' invalide, écarté. : 0
> établissement de SIRET {} écarté : il n'a pas de nomemclature APE. : 0
> établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, 
> est invalide : 0
> établissement de SIRET {} écarté : sa date de création, {}, est invalide : 
> 0{noformat}
> *2)* But if I don't initialize that list, and that I leave the creation of 
> the missing {{longAccumulator}} to the worker itself, at runtime (filter 
> time), then the :
> {{session.sparkContext().longAccumulator(codeOuFormatMessage);}}
> fails on a {{{}NullPointerException{}}}, as {{sparkContext()}} returns null.
>  
> My first question is : is it normal that, when taking the real actions to 
> build the dataset, the spark session returns a {{null}} context?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to