> What if you just do a join with the first condition (equal chromosome) and
> append a select with the rest of the conditions after join?  This will
> allow you to test your query step by step, maybe with a visual inspection
> to figure out what the problem is. It may be a data quality problem as
> well, not with your query...
> On 11/27/22 12:30 PM, Oliver Ruebenacker wrote:
>      Hello,
>   I have two Dataframes I want to join using a condition such that each
> record from each Dataframe may be joined with multiple records from the
> other Dataframe. This means the original records should appear multiple
> times in the resulting joined Dataframe if the condition is fulfilled for
> multiple pairings of that record and records from the other Dataframe.
>   Is this possible, and if so, how? I tried "inner", "outer", "cross" and
> "full", but never got the desired result.
>   One Dataframe contains genetic variants with chromosome and position,
> the other genes with chromosome, start and end. I want to obtain each
> variant-gene pair where the variant is sufficiently close to the gene,
> which means on the same chromosome and between start end end or within a
> padding. The join is this:
> *cond = (genes.chromosome == variants.chromosome) & \
>  (genes.start - padding <= variants.position) & \            (genes.end +
> padding >= variants.position)     gene_variants =
> genes.join(variants.alias('variants'), cond, "left_outer") *
>   The entire file looks like this:
> *import argparse from pyspark.sql import SparkSession def main():     """
>     Arguments: none     """     arg_parser =
> argparse.ArgumentParser(prog='huge.py')
> arg_parser.add_argument("--phenotype", help="The phenotype", required=True)
>     arg_parser.add_argument("--genes", help="Gene data with regions",
> required=True)     arg_parser.add_argument("--gene-associations",
> help="Gene data with p-values", required=True)
> arg_parser.add_argument("--variants", help="Variant data", required=True)
>   arg_parser.add_argument("--padding", help="Variants are considered this
> far away from the gene")     cli_args = arg_parser.parse_args()
> phenotype = cli_args.phenotype     files_glob = 'part-*'     genes_glob =
> cli_args.genes + files_glob     genes_assoc_glob =
> cli_args.gene_associations + files_glob     variants_glob =
> cli_args.variants + files_glob     padding = getattr(cli_args, 'padding',
> 100000)     print('Phenotype: ' + phenotype)     print('Genes data with
> regions: ' + genes_glob)     print('Gene data with p-values: ' +
> genes_assoc_glob)     print('Variant data: ' + variants_glob)     spark =
> SparkSession.builder.appName('huge').getOrCreate()     print('Genes from '
> + genes_glob + ':')     genes_regions_raw = spark.read.json(genes_glob)
> gene_regions = genes_regions_raw.select('chromosome', 'start', 'end',
> 'source', 'name')\         .filter(genes_regions_raw.source ==
> 'symbol').drop(genes_regions_raw.source)     print('There are ' +
> str(gene_regions.count()) + ' gene regions:')     for row in
> gene_regions.take(42):         print(row)     gene_p_values =
> spark.read.json(genes_assoc_glob).select('gene', 'pValue')     print('There
> are ' + str(gene_p_values.count()) + ' gene associations')     for row in
> gene_p_values.take(42):         print(row)     genes =
> gene_regions.join(gene_p_values, gene_regions.name
> <http://gene_regions.name> == gene_p_values.gene)     print("Joined gene
> data gives " + str(genes.count()) + ' rows:')     for row in
> genes.take(42):         print(row)     variants =
> spark.read.json(variants_glob).select('chromosome', 'position',
> 'reference', 'alt', 'pValue')     print('There is data from ' +
> str(variants.count()) + ' variants:')     for row in variants.take(42):
>     print(row)     cond = (genes.chromosome == variants.chromosome) & \
>        (genes.start - padding <= variants.position) & \
>  (genes.end + padding >= variants.position)     gene_variants =
> genes.join(variants.alias('variants'), cond, "left_outer")
> print('Joining genes and variants give ' + str(gene_variants.count()) + '
> pairs:')     for row in gene_variants.take(42):         print(row)
> print('Stopping Spark')     spark.stop() if __name__ == '__main__':
> main() *
>   Thanks!
>      Best, Oliver
