What if you just do a join with the first condition (equal chromosome) and append a select with the rest of the conditions after join?  This will allow you to test your query step by step, maybe with a visual inspection to figure out what the problem is. It may be a data quality problem as well, not with your query...

On 11/27/22 12:30 PM, Oliver Ruebenacker wrote:

     Hello,

  I have two Dataframes I want to join using a condition such that each record from each Dataframe may be joined with multiple records from the other Dataframe. This means the original records should appear multiple times in the resulting joined Dataframe if the condition is fulfilled for multiple pairings of that record and records from the other Dataframe.

  Is this possible, and if so, how? I tried "inner", "outer", "cross" and "full", but never got the desired result.

  One Dataframe contains genetic variants with chromosome and position, the other genes with chromosome, start and end. I want to obtain each variant-gene pair where the variant is sufficiently close to the gene, which means on the same chromosome and between start end end or within a padding. The join is this:

*cond = (genes.chromosome == variants.chromosome) & \
           (genes.start - padding <= variants.position) & \
           (genes.end + padding >= variants.position)
    gene_variants = genes.join(variants.alias('variants'), cond, "left_outer")
*

  The entire file looks like this:

*import argparse
from pyspark.sql import SparkSession


def main():
    """
    Arguments: none
    """
    arg_parser = argparse.ArgumentParser(prog='huge.py')
    arg_parser.add_argument("--phenotype", help="The phenotype", required=True)     arg_parser.add_argument("--genes", help="Gene data with regions", required=True)     arg_parser.add_argument("--gene-associations", help="Gene data with p-values", required=True)     arg_parser.add_argument("--variants", help="Variant data", required=True)     arg_parser.add_argument("--padding", help="Variants are considered this far away from the gene")
    cli_args = arg_parser.parse_args()
    phenotype = cli_args.phenotype
    files_glob = 'part-*'
    genes_glob = cli_args.genes + files_glob
    genes_assoc_glob = cli_args.gene_associations + files_glob
    variants_glob = cli_args.variants + files_glob
    padding = getattr(cli_args, 'padding', 100000)
    print('Phenotype: ' + phenotype)
    print('Genes data with regions: ' + genes_glob)
    print('Gene data with p-values: ' + genes_assoc_glob)
    print('Variant data: ' + variants_glob)
    spark = SparkSession.builder.appName('huge').getOrCreate()
    print('Genes from ' + genes_glob + ':')
    genes_regions_raw = spark.read.json(genes_glob)
    gene_regions = genes_regions_raw.select('chromosome', 'start', 'end', 'source', 'name')\         .filter(genes_regions_raw.source == 'symbol').drop(genes_regions_raw.source)
    print('There are ' + str(gene_regions.count()) + ' gene regions:')
    for row in gene_regions.take(42):
        print(row)
    gene_p_values = spark.read.json(genes_assoc_glob).select('gene', 'pValue')     print('There are ' + str(gene_p_values.count()) + ' gene associations')
    for row in gene_p_values.take(42):
        print(row)
    genes = gene_regions.join(gene_p_values, gene_regions.name <http://gene_regions.name> == gene_p_values.gene)
    print("Joined gene data gives " + str(genes.count()) + ' rows:')
    for row in genes.take(42):
        print(row)
    variants = spark.read.json(variants_glob).select('chromosome', 'position', 'reference', 'alt', 'pValue')
    print('There is data from ' + str(variants.count()) + ' variants:')
    for row in variants.take(42):
        print(row)
    cond = (genes.chromosome == variants.chromosome) & \
           (genes.start - padding <= variants.position) & \
           (genes.end + padding >= variants.position)
    gene_variants = genes.join(variants.alias('variants'), cond, "left_outer")     print('Joining genes and variants give ' + str(gene_variants.count()) + ' pairs:')
    for row in gene_variants.take(42):
        print(row)
    print('Stopping Spark')
    spark.stop()


if __name__ == '__main__':
    main()
*

  Thanks!

     Best, Oliver

--
Oliver Ruebenacker, Ph.D. (he)
Senior Software Engineer, Knowledge Portal Network <http://kp4cd.org/>, Flannick Lab <http://www.flannicklab.org/>, Broad Institute <http://www.broadinstitute.org/>

Reply via email to