What if you just do a join with the first condition (equal chromosome)
and append a select with the rest of the conditions after join? This
will allow you to test your query step by step, maybe with a visual
inspection to figure out what the problem is. It may be a data quality
problem as well, not with your query...
On 11/27/22 12:30 PM, Oliver Ruebenacker wrote:
Hello,
I have two Dataframes I want to join using a condition such that
each record from each Dataframe may be joined with multiple records
from the other Dataframe. This means the original records should
appear multiple times in the resulting joined Dataframe if the
condition is fulfilled for multiple pairings of that record and
records from the other Dataframe.
Is this possible, and if so, how? I tried "inner", "outer", "cross"
and "full", but never got the desired result.
One Dataframe contains genetic variants with chromosome and
position, the other genes with chromosome, start and end. I want to
obtain each variant-gene pair where the variant is sufficiently close
to the gene, which means on the same chromosome and between start end
end or within a padding. The join is this:
*cond = (genes.chromosome == variants.chromosome) & \
(genes.start - padding <= variants.position) & \
(genes.end + padding >= variants.position)
gene_variants = genes.join(variants.alias('variants'), cond,
"left_outer")
*
The entire file looks like this:
*import argparse
from pyspark.sql import SparkSession
def main():
"""
Arguments: none
"""
arg_parser = argparse.ArgumentParser(prog='huge.py')
arg_parser.add_argument("--phenotype", help="The phenotype",
required=True)
arg_parser.add_argument("--genes", help="Gene data with regions",
required=True)
arg_parser.add_argument("--gene-associations", help="Gene data
with p-values", required=True)
arg_parser.add_argument("--variants", help="Variant data",
required=True)
arg_parser.add_argument("--padding", help="Variants are considered
this far away from the gene")
cli_args = arg_parser.parse_args()
phenotype = cli_args.phenotype
files_glob = 'part-*'
genes_glob = cli_args.genes + files_glob
genes_assoc_glob = cli_args.gene_associations + files_glob
variants_glob = cli_args.variants + files_glob
padding = getattr(cli_args, 'padding', 100000)
print('Phenotype: ' + phenotype)
print('Genes data with regions: ' + genes_glob)
print('Gene data with p-values: ' + genes_assoc_glob)
print('Variant data: ' + variants_glob)
spark = SparkSession.builder.appName('huge').getOrCreate()
print('Genes from ' + genes_glob + ':')
genes_regions_raw = spark.read.json(genes_glob)
gene_regions = genes_regions_raw.select('chromosome', 'start',
'end', 'source', 'name')\
.filter(genes_regions_raw.source ==
'symbol').drop(genes_regions_raw.source)
print('There are ' + str(gene_regions.count()) + ' gene regions:')
for row in gene_regions.take(42):
print(row)
gene_p_values = spark.read.json(genes_assoc_glob).select('gene',
'pValue')
print('There are ' + str(gene_p_values.count()) + ' gene
associations')
for row in gene_p_values.take(42):
print(row)
genes = gene_regions.join(gene_p_values, gene_regions.name
<http://gene_regions.name> == gene_p_values.gene)
print("Joined gene data gives " + str(genes.count()) + ' rows:')
for row in genes.take(42):
print(row)
variants = spark.read.json(variants_glob).select('chromosome',
'position', 'reference', 'alt', 'pValue')
print('There is data from ' + str(variants.count()) + ' variants:')
for row in variants.take(42):
print(row)
cond = (genes.chromosome == variants.chromosome) & \
(genes.start - padding <= variants.position) & \
(genes.end + padding >= variants.position)
gene_variants = genes.join(variants.alias('variants'), cond,
"left_outer")
print('Joining genes and variants give ' +
str(gene_variants.count()) + ' pairs:')
for row in gene_variants.take(42):
print(row)
print('Stopping Spark')
spark.stop()
if __name__ == '__main__':
main()
*
Thanks!
Best, Oliver
--
Oliver Ruebenacker, Ph.D. (he)
Senior Software Engineer, Knowledge Portal Network
<http://kp4cd.org/>, Flannick Lab <http://www.flannicklab.org/>, Broad
Institute <http://www.broadinstitute.org/>