Hi Spark community,

I'm very new to the Apache Spark community; but if this (very active) group
is anything like the other Apache project user groups I've worked with, I'm
going to enjoy discussions here very much. Thanks in advance!

Use Case:
I am trying to go from flat files of user response data, to contingency
tables of frequency counts, to Pearson Chi-Square correlation statistics
and perform a Chi-Squared hypothesis test.  The user response data
represents a multiple choice question-answer (MCQ) format. The goal is to
compute all choose-two combinations of question answers (precondition,
question X question) contingency tables. Each cell of the contingency table
is the intersection of the users whom responded per each option per each
question of the table.

An overview of the problem:
// data ingestion and typing schema: Observation (u: String, d:
java.util.Date, t: String, q: String, v: String, a: Int)
// a question (q) has a finite set of response options (v) per which a user
(u) responds
// additional response fields are not required per this test
for (precondition a) {
  for (q_i in lex ordered questions) {
    for (q_j in lex ordered question, q_j > q_i) {
        forall v_k \in q_i get set of distinct users {u}_ik
        forall v_l \in q_j get set of distinct users {u}_jl
        forall cells per table (a,q_i,q_j) defn C_ijkl = |intersect({u}_ik,
{u}_jl)| // contingency table construct
        compute chisq test per this contingency table and persist
    }
  }
}

The scala main I'm testing is provided below, and I was planning to use the
provided example
https://spark.apache.org/docs/1.3.1/mllib-statistics.html however
I am not sure how to go from my RDD[Observation] to the necessary
precondition of RDD[Vector] for ingestion

  def main(args: Array[String]): Unit = {
    // setup main space for test
    val conf = new SparkConf().setAppName("TestMain")
    val sc = new SparkContext(conf)

    // data ETL and typing schema
    case class Observation (u: String, d: java.util.Date, t: String, q:
String, v: String, a: Int)
    val date_format = new java.text.SimpleDateFormat("yyyyMMdd")
    val data_project_abs_dir = "/my/path/to/data/files"
    val data_files = data_project_abs_dir + "/part-*.gz"
    val data = sc.textFile(data_files)
    val observations = data.map(line => line.split(",").map(_.trim)).map(r
=> Observation(r(0).toString, date_format.parse(r(1).toString),
r(2).toString, r(3).toString, r(4).toString, r(5).toInt))
    observations.cache

    // ToDo: the basic keying of the space, possibly...
    val qvu = observations.map(o => ((o.a, o.q, o.v), o.u)).distinct

    // ToDo: ok, so now how to get this into the precondition RDD[Vector]
from the Spark example to make a contingency table?...

    // ToDo: perform then persist the resulting chisq and p-value on these
contingency tables...
  }


Any help is appreciated.

Thanks!  -Dan

Reply via email to