[ https://issues.apache.org/jira/browse/SPARK-47156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Marc Le Bihan updated SPARK-47156: ---------------------------------- Description: I need first to know if I'm in front of a bug or not. If it's the case, I'll manage to create a test to help you reproduce the case, but if it isn't, maybe Spark documentation could explain when {{sparkSession.getContext()}} can return {{{}null{}}}. I'm willing to ease my development by separating : * parquet files management \{ checking existence, then loading them as cache, or saving data to them }, * from dataset creation, when it doesn't exist yet, and should be constituted from scratch. The method I'm using is this one: {code:java} {code:Java} protected Dataset<Row> constitutionStandard(OptionsCreationLecture optionsCreationLecture, Supplier<Dataset<Row>> worker, CacheParqueteur<? extends TriParqueteurIF> cacheParqueteur) { OptionsCreationLecture options = optionsCreationLecture != null ? optionsCreationLecture : optionsCreationLecture(); Dataset<Row> dataset = cacheParqueteur.call(options.useCache()); return dataset == null ? cacheParqueteur.save(cacheParqueteur.appliquer(worker.get())) : dataset; } {code} In case the dataset doesn't exist in parquet files (= cache) yet, it starts its creation by calling a {{worker.get()}} that is a {{Supplier}} of {{{}Dataset<Row>{}}}. A concrete usage is this one: {code:java} {code:Java} public Dataset<Row> rowEtablissements(OptionsCreationLecture optionsCreationLecture, HistoriqueExecution historiqueExecution, int anneeCOG, int anneeSIRENE, boolean actifsSeulement, boolean communesValides, boolean nomenclaturesNAF2Valides) { OptionsCreationLecture options = optionsCreationLecture != null ? optionsCreationLecture : optionsCreationLecture(); Supplier<Dataset<Row>> worker = () -> { super.setStageDescription(this.messageSource, "row.etablissements.libelle.long", "row.etablissements.libelle.court", anneeSIRENE, anneeCOG, actifsSeulement, communesValides, nomenclaturesNAF2Valides); Map<String, Integer> indexs = new HashMap<>(); Dataset<Row> etablissements = etablissementsNonFiltres(optionsCreationLecture, anneeSIRENE); etablissements = etablissements.filter( (FilterFunction<Row>)etablissement -> this.validator.validationEtablissement(this.session, historiqueExecution, etablissement, actifsSeulement, nomenclaturesNAF2Valides, indexs)); // Si le filtrage par communes valides a été demandé, l'appliquer. if (communesValides) { etablissements = rowRestreindreAuxCommunesValides(etablissements, anneeCOG, anneeSIRENE, indexs); } else { etablissements = etablissements.withColumn("codeDepartement", substring(CODE_COMMUNE.col(), 1, 2)); } // Associer les libellés des codes APE/NAF. Dataset<Row> nomenclatureNAF = this.nafDataset.rowNomenclatureNAF(anneeSIRENE); etablissements = etablissements.join(nomenclatureNAF, etablissements.col("activitePrincipale").equalTo(nomenclatureNAF.col("codeNAF")) , "left_outer") .drop("codeNAF", "niveauNAF"); // Le dataset est maintenant considéré comme valide, et ses champs peuvent être castés dans leurs types définitifs. return this.validator.cast(etablissements); }; return constitutionStandard(options, () -> worker.get() .withColumn("partitionSiren", SIREN_ENTREPRISE.col().substr(1,2)), new CacheParqueteur<>(options, this.session, "etablissements", "annee_{0,number,#0}-actifs_{1}-communes_verifiees_{2}-nafs_verifies_{3}", DEPARTEMENT_SIREN_SIRET, anneeSIRENE, anneeCOG, actifsSeulement, communesValides)); } {code} In the worker, a filter calls a {{validationEtablissement(SparkSession, HistoriqueExecution, Row, ...)}} on each row to perform complete checking (eight rules to check for an establishment validity). When a check fails, along with a warning log, I'm also counting in {{historiqueExecution}} object the number of problems of that kind I've encountered. That function increase a {{longAccumulator}} value, and create that accumulator first, that it stores in a {{{}Map<String, LongAccumulator> accumulators{}}}, if needed. {code:java} {code:Java} public void incrementerOccurrences(SparkSession session, String codeOuFormatMessage, boolean creerSiAbsent) { LongAccumulator accumulator = accumulators.get(codeOuFormatMessage); if (accumulator == null && creerSiAbsent) { accumulator = session.sparkContext().longAccumulator(codeOuFormatMessage); accumulators.put(codeOuFormatMessage, accumulator); } if (accumulator != null) { accumulator.add(1); } } {code} Or at least, it should. But my problem is that it isn't the case. During Dataset constitution : *1)* If I initialize the {{historiqueExecution}} variable with the exhaustive list of messages it can have to count, +*before*+ the {{worker.get()}} is called by the {{constitutionStandard}} method, the dataset is perfectly constituted and I can dump my counters : {code:java} {code:Java} historiqueExecution.setCodesMessages(this.session, "etablissement sans SIRET, peut être étranger : '{}'", "etablissement au SIRET '{}' invalide, écarté.", "etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté.", "etablissement au SIREN d'entreprise '{}' invalide, écarté.", "établissement de SIRET {} écarté : il n'a pas de nomemclature APE.", "établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue.", "établissement de SIRET {} écarté : il n'a pas de code APE.", "établissement de SIRET {} écarté : son code APE {} est invalide.", "établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide", "établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide", "établissement de SIRET {} écarté : sa date de création, {}, est invalide", "établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {}", "établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide");// code placeholder {code} {noformat} etablissement au SIRET '{}' invalide, écarté. : 0 établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue. : 0 etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté. : 0 etablissement sans SIRET, peut être étranger : '{}' : 0 établissement de SIRET {} écarté : son code APE {} est invalide. : 0 établissement de SIRET {} écarté : il n'a pas de code APE. : 0 établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide : 0 établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide : 2 établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {} : 0 etablissement au SIREN d'entreprise '{}' invalide, écarté. : 0 établissement de SIRET {} écarté : il n'a pas de nomemclature APE. : 0 établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide : 0 établissement de SIRET {} écarté : sa date de création, {}, est invalide : 0{noformat} *2)* But if I don't initialize that list, and that I leave the creation of the missing {{longAccumulator}} to the worker itself, at runtime (filter time), then the : {{session.sparkContext().longAccumulator(codeOuFormatMessage);}} fails on a {{{}NullPointerException{}}}, as {{sparkContext()}} returns null. My first question is : is it normal that, when taking the real actions to build the dataset, the spark session returns a {{null}} context? was: I need first to know if I'm in front of a bug or not. If it's the case, I manage to create a test to help you reproduce the case, but if it isn't, maybe Spark documentation could explain when {{sparkSession.getContext()}} can return {{{}null{}}}. I'm willing to ease my development by separating : * parquet files management \{ checking existence, then loading them as cache, or saving data to them }, * from dataset creation, when it doesn't exist yet, and should be constituted from scratch. The method I'm using is this one: {code:java} {code:Java} protected Dataset<Row> constitutionStandard(OptionsCreationLecture optionsCreationLecture, Supplier<Dataset<Row>> worker, CacheParqueteur<? extends TriParqueteurIF> cacheParqueteur) { OptionsCreationLecture options = optionsCreationLecture != null ? optionsCreationLecture : optionsCreationLecture(); Dataset<Row> dataset = cacheParqueteur.call(options.useCache()); return dataset == null ? cacheParqueteur.save(cacheParqueteur.appliquer(worker.get())) : dataset; } {code} In case the dataset doesn't exist in parquet files (= cache) yet, it starts its creation by calling a {{worker.get()}} that is a {{Supplier}} of {{{}Dataset<Row>{}}}. A concrete usage is this one: {code:java} {code:Java} public Dataset<Row> rowEtablissements(OptionsCreationLecture optionsCreationLecture, HistoriqueExecution historiqueExecution, int anneeCOG, int anneeSIRENE, boolean actifsSeulement, boolean communesValides, boolean nomenclaturesNAF2Valides) { OptionsCreationLecture options = optionsCreationLecture != null ? optionsCreationLecture : optionsCreationLecture(); Supplier<Dataset<Row>> worker = () -> { super.setStageDescription(this.messageSource, "row.etablissements.libelle.long", "row.etablissements.libelle.court", anneeSIRENE, anneeCOG, actifsSeulement, communesValides, nomenclaturesNAF2Valides); Map<String, Integer> indexs = new HashMap<>(); Dataset<Row> etablissements = etablissementsNonFiltres(optionsCreationLecture, anneeSIRENE); etablissements = etablissements.filter( (FilterFunction<Row>)etablissement -> this.validator.validationEtablissement(this.session, historiqueExecution, etablissement, actifsSeulement, nomenclaturesNAF2Valides, indexs)); // Si le filtrage par communes valides a été demandé, l'appliquer. if (communesValides) { etablissements = rowRestreindreAuxCommunesValides(etablissements, anneeCOG, anneeSIRENE, indexs); } else { etablissements = etablissements.withColumn("codeDepartement", substring(CODE_COMMUNE.col(), 1, 2)); } // Associer les libellés des codes APE/NAF. Dataset<Row> nomenclatureNAF = this.nafDataset.rowNomenclatureNAF(anneeSIRENE); etablissements = etablissements.join(nomenclatureNAF, etablissements.col("activitePrincipale").equalTo(nomenclatureNAF.col("codeNAF")) , "left_outer") .drop("codeNAF", "niveauNAF"); // Le dataset est maintenant considéré comme valide, et ses champs peuvent être castés dans leurs types définitifs. return this.validator.cast(etablissements); }; return constitutionStandard(options, () -> worker.get() .withColumn("partitionSiren", SIREN_ENTREPRISE.col().substr(1,2)), new CacheParqueteur<>(options, this.session, "etablissements", "annee_{0,number,#0}-actifs_{1}-communes_verifiees_{2}-nafs_verifies_{3}", DEPARTEMENT_SIREN_SIRET, anneeSIRENE, anneeCOG, actifsSeulement, communesValides)); } {code} In the worker, a filter calls a {{validationEtablissement(SparkSession, HistoriqueExecution, Row, ...)}} on each row to perform complete checking (eight rules to check for an establishment validity). When a check fails, along with a warning log, I'm also counting in {{historiqueExecution}} object the number of problems of that kind I've encountered. That function increase a {{longAccumulator}} value, and create that accumulator first, that it stores in a {{{}Map<String, LongAccumulator> accumulators{}}}, if needed. {code:java} {code:Java} public void incrementerOccurrences(SparkSession session, String codeOuFormatMessage, boolean creerSiAbsent) { LongAccumulator accumulator = accumulators.get(codeOuFormatMessage); if (accumulator == null && creerSiAbsent) { accumulator = session.sparkContext().longAccumulator(codeOuFormatMessage); accumulators.put(codeOuFormatMessage, accumulator); } if (accumulator != null) { accumulator.add(1); } } {code} Or at least, it should. But my problem is that it isn't the case. During Dataset constitution : *1)* If I initialize the {{historiqueExecution}} variable with the exhaustive list of messages it can have to count, +*before*+ the {{worker.get()}} is called by the {{constitutionStandard}} method, the dataset is perfectly constituted and I can dump my counters : {code:java} {code:Java} historiqueExecution.setCodesMessages(this.session, "etablissement sans SIRET, peut être étranger : '{}'", "etablissement au SIRET '{}' invalide, écarté.", "etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté.", "etablissement au SIREN d'entreprise '{}' invalide, écarté.", "établissement de SIRET {} écarté : il n'a pas de nomemclature APE.", "établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue.", "établissement de SIRET {} écarté : il n'a pas de code APE.", "établissement de SIRET {} écarté : son code APE {} est invalide.", "établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide", "établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide", "établissement de SIRET {} écarté : sa date de création, {}, est invalide", "établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {}", "établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide");// code placeholder {code} {noformat} etablissement au SIRET '{}' invalide, écarté. : 0 établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue. : 0 etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET vaut : {}), écarté. : 0 etablissement sans SIRET, peut être étranger : '{}' : 0 établissement de SIRET {} écarté : son code APE {} est invalide. : 0 établissement de SIRET {} écarté : il n'a pas de code APE. : 0 établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide : 0 établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, est invalide : 2 établissement de SIRET {} écarté : sa date de dernier traitement, {}, est invalide : {} : 0 etablissement au SIREN d'entreprise '{}' invalide, écarté. : 0 établissement de SIRET {} écarté : il n'a pas de nomemclature APE. : 0 établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, est invalide : 0 établissement de SIRET {} écarté : sa date de création, {}, est invalide : 0{noformat} *2)* But if I don't initialize that list, and that I leave the creation of the missing {{longAccumulator}} to the worker itself, at runtime (filter time), then the : {{session.sparkContext().longAccumulator(codeOuFormatMessage);}} fails on a {{{}NullPointerException{}}}, as {{sparkContext()}} returns null. My first question is : is it normal that, when taking the real actions to build the dataset, the spark session returns a {{null}} context? > SparkSession returns a null context during a dataset creation > ------------------------------------------------------------- > > Key: SPARK-47156 > URL: https://issues.apache.org/jira/browse/SPARK-47156 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.4.2 > Environment: Debian 12 > Java 17 > Reporter: Marc Le Bihan > Priority: Major > > I need first to know if I'm in front of a bug or not. > If it's the case, I'll manage to create a test to help you reproduce the > case, but if it isn't, maybe Spark documentation could explain when > {{sparkSession.getContext()}} can return {{{}null{}}}. > > I'm willing to ease my development by separating : > * parquet files management \{ checking existence, then loading them as > cache, or saving data to them }, > * from dataset creation, when it doesn't exist yet, and should be > constituted from scratch. > > The method I'm using is this one: > {code:java} > {code:Java} > protected Dataset<Row> constitutionStandard(OptionsCreationLecture > optionsCreationLecture, > Supplier<Dataset<Row>> worker, CacheParqueteur<? extends TriParqueteurIF> > cacheParqueteur) { > OptionsCreationLecture options = optionsCreationLecture != null ? > optionsCreationLecture : optionsCreationLecture(); > Dataset<Row> dataset = cacheParqueteur.call(options.useCache()); > return dataset == null ? > cacheParqueteur.save(cacheParqueteur.appliquer(worker.get())) : dataset; > } > {code} > In case the dataset doesn't exist in parquet files (= cache) yet, it starts > its creation by calling a {{worker.get()}} that is a {{Supplier}} of > {{{}Dataset<Row>{}}}. > > A concrete usage is this one: > {code:java} > {code:Java} > public Dataset<Row> rowEtablissements(OptionsCreationLecture > optionsCreationLecture, HistoriqueExecution historiqueExecution, int > anneeCOG, int anneeSIRENE, boolean actifsSeulement, boolean communesValides, > boolean nomenclaturesNAF2Valides) { > OptionsCreationLecture options = optionsCreationLecture != null ? > optionsCreationLecture : optionsCreationLecture(); > Supplier<Dataset<Row>> worker = () -> { > super.setStageDescription(this.messageSource, > "row.etablissements.libelle.long", "row.etablissements.libelle.court", > anneeSIRENE, anneeCOG, actifsSeulement, communesValides, > nomenclaturesNAF2Valides); > > Map<String, Integer> indexs = new HashMap<>(); > Dataset<Row> etablissements = > etablissementsNonFiltres(optionsCreationLecture, anneeSIRENE); > etablissements = etablissements.filter( > (FilterFunction<Row>)etablissement -> > this.validator.validationEtablissement(this.session, historiqueExecution, > etablissement, actifsSeulement, nomenclaturesNAF2Valides, indexs)); > // Si le filtrage par communes valides a été demandé, l'appliquer. > if (communesValides) { > etablissements = rowRestreindreAuxCommunesValides(etablissements, > anneeCOG, anneeSIRENE, indexs); > } > else { > etablissements = etablissements.withColumn("codeDepartement", > substring(CODE_COMMUNE.col(), 1, 2)); > } > // Associer les libellés des codes APE/NAF. > Dataset<Row> nomenclatureNAF = > this.nafDataset.rowNomenclatureNAF(anneeSIRENE); > etablissements = etablissements.join(nomenclatureNAF, > etablissements.col("activitePrincipale").equalTo(nomenclatureNAF.col("codeNAF")) > , "left_outer") > .drop("codeNAF", "niveauNAF"); > // Le dataset est maintenant considéré comme valide, et ses champs > peuvent être castés dans leurs types définitifs. > return this.validator.cast(etablissements); > }; > return constitutionStandard(options, () -> worker.get() > .withColumn("partitionSiren", SIREN_ENTREPRISE.col().substr(1,2)), > new CacheParqueteur<>(options, this.session, > "etablissements", > "annee_{0,number,#0}-actifs_{1}-communes_verifiees_{2}-nafs_verifies_{3}", > DEPARTEMENT_SIREN_SIRET, > anneeSIRENE, anneeCOG, actifsSeulement, communesValides)); > } {code} > > In the worker, a filter calls a {{validationEtablissement(SparkSession, > HistoriqueExecution, Row, ...)}} on each row to perform complete checking > (eight rules to check for an establishment validity). > When a check fails, along with a warning log, I'm also counting in > {{historiqueExecution}} object the number of problems of that kind I've > encountered. > That function increase a {{longAccumulator}} value, and create that > accumulator first, that it stores in a {{{}Map<String, LongAccumulator> > accumulators{}}}, if needed. > {code:java} > {code:Java} > public void incrementerOccurrences(SparkSession session, String > codeOuFormatMessage, boolean creerSiAbsent) { > LongAccumulator accumulator = accumulators.get(codeOuFormatMessage); > if (accumulator == null && creerSiAbsent) { > accumulator = > session.sparkContext().longAccumulator(codeOuFormatMessage); > accumulators.put(codeOuFormatMessage, accumulator); > } > if (accumulator != null) { > accumulator.add(1); > } > } > {code} > > Or at least, it should. But my problem is that it isn't the case. > During Dataset constitution : > *1)* If I initialize the {{historiqueExecution}} variable with the exhaustive > list of messages it can have to count, +*before*+ the {{worker.get()}} is > called by the {{constitutionStandard}} method, the dataset is perfectly > constituted and I can dump my counters : > {code:java} > {code:Java} > historiqueExecution.setCodesMessages(this.session, > "etablissement sans SIRET, peut être étranger : '{}'", > "etablissement au SIRET '{}' invalide, écarté.", > "etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son > SIRET vaut : {}), écarté.", > "etablissement au SIREN d'entreprise '{}' invalide, écarté.", > "établissement de SIRET {} écarté : il n'a pas de nomemclature APE.", > "établissement de SIRET {} écarté : sa nomemclature {} n'est plus > soutenue.", > "établissement de SIRET {} écarté : il n'a pas de code APE.", > "établissement de SIRET {} écarté : son code APE {} est invalide.", > "établissement de SIRET {} écarté : son type de voie d'adresse principale, > {}, est invalide", > "établissement de SIRET {} écarté : son type de voie d'adresse secondaire, > {}, est invalide", > "établissement de SIRET {} écarté : sa date de création, {}, est invalide", > "établissement de SIRET {} écarté : sa date de dernier traitement, {}, est > invalide : {}", > "établissement de SIRET {} écarté : sa date d'historisation, {}, est > invalide");// code placeholder > {code} > {noformat} > etablissement au SIRET '{}' invalide, écarté. : 0 > établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue. : 0 > etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET > vaut : {}), écarté. : 0 > etablissement sans SIRET, peut être étranger : '{}' : 0 > établissement de SIRET {} écarté : son code APE {} est invalide. : 0 > établissement de SIRET {} écarté : il n'a pas de code APE. : 0 > établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide > : 0 > établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, > est invalide : 2 > établissement de SIRET {} écarté : sa date de dernier traitement, {}, est > invalide : {} : 0 > etablissement au SIREN d'entreprise '{}' invalide, écarté. : 0 > établissement de SIRET {} écarté : il n'a pas de nomemclature APE. : 0 > établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, > est invalide : 0 > établissement de SIRET {} écarté : sa date de création, {}, est invalide : > 0{noformat} > *2)* But if I don't initialize that list, and that I leave the creation of > the missing {{longAccumulator}} to the worker itself, at runtime (filter > time), then the : > {{session.sparkContext().longAccumulator(codeOuFormatMessage);}} > fails on a {{{}NullPointerException{}}}, as {{sparkContext()}} returns null. > > My first question is : is it normal that, when taking the real actions to > build the dataset, the spark session returns a {{null}} context? > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org