Bilind Hajer created SPARK-11190:
------------------------------------

             Summary: SparkR support for cassandra collection types. 
                 Key: SPARK-11190
                 URL: https://issues.apache.org/jira/browse/SPARK-11190
             Project: Spark
          Issue Type: Bug
    Affects Versions: 1.5.1
         Environment: SparkR Version: 1.5.1
Cassandra Version: 2.1.6
R Version: 3.2.2 
Cassandra Connector version: 1.5.0-M2
            Reporter: Bilind Hajer
             Fix For: 1.5.2


I want to create a data frame from a Cassandra keyspace and column family in 
sparkR. 
I am able to create data frames from tables which do not include any Cassandra 
collection datatypes, 
such as Map, Set and List.  But, many of the schemas that I need data from, do 
include these collection data types. 

Here is my local environment. 

SparkR Version: 1.5.1
Cassandra Version: 2.1.6
R Version: 3.2.2 
Cassandra Connector version: 1.5.0-M2

To test this issue, I did the following iterative process. 

sudo ./sparkR --packages 
com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M2 --conf 
spark.cassandra.connection.host=127.0.0.1

Running this command, with sparkR gives me access to the spark cassandra 
connector package I need, 
and connects me to my local cqlsh server ( which is up and running while 
running this code in sparkR shell ). 

CREATE TABLE test_table (
      column_1                         int,
      column_2                         text,
      column_3                         float,
      column_4                         uuid,
      column_5                         timestamp,
      column_6                         boolean,
      column_7                         timeuuid,
      column_8                         bigint,
      column_9                         blob,
      column_10                       ascii,
      column_11                       decimal,
      column_12                       double,
      column_13                       inet,
      column_14                       varchar,
      column_15                       varint,
      PRIMARY KEY( ( column_1, column_2 ) )
); 

All of the above data types are supported. I insert dummy data after creating 
this test schema. 

For example, now in my sparkR shell, I run the following code. 

df.test  <- read.df(sqlContext,  source = "org.apache.spark.sql.cassandra", 
keyspace = "datahub", table = "test_table")

assigns with no errors, then, 

> schema(df.test)

StructType
|-name = "column_1", type = "IntegerType", nullable = TRUE
|-name = "column_2", type = "StringType", nullable = TRUE
|-name = "column_10", type = "StringType", nullable = TRUE
|-name = "column_11", type = "DecimalType(38,18)", nullable = TRUE
|-name = "column_12", type = "DoubleType", nullable = TRUE
|-name = "column_13", type = "InetAddressType", nullable = TRUE
|-name = "column_14", type = "StringType", nullable = TRUE
|-name = "column_15", type = "DecimalType(38,0)", nullable = TRUE
|-name = "column_3", type = "FloatType", nullable = TRUE
|-name = "column_4", type = "UUIDType", nullable = TRUE
|-name = "column_5", type = "TimestampType", nullable = TRUE
|-name = "column_6", type = "BooleanType", nullable = TRUE
|-name = "column_7", type = "UUIDType", nullable = TRUE
|-name = "column_8", type = "LongType", nullable = TRUE
|-name = "column_9", type = "BinaryType", nullable = TRUE

Schema is correct. 

> class(df.test)
[1] "DataFrame"
attr(,"package")
[1] "SparkR"

df.test is clearly defined to be a DataFrame Object. 

> head(df.test)

  column_1 column_2 column_10 column_11 column_12 column_13 column_14 column_15
1        1    hello        NA        NA        NA        NA        NA        NA
  column_3 column_4 column_5 column_6 column_7 column_8 column_9
1      3.4       NA       NA       NA       NA       NA       NA

sparkR is reading from the column_family correctly, but now lets add a 
collection data type to the schema. 

Now I will drop that test_table, and recreate the table, with with an extra 
column of data type  map<text,int>

CREATE TABLE test_table (
      column_1                         int,
      column_2                         text,
      column_3                         float,
      column_4                         uuid,
      column_5                         timestamp,
      column_6                         boolean,
      column_7                         timeuuid,
      column_8                         bigint,
      column_9                         blob,
      column_10                        ascii,
      column_11                        decimal,
      column_12                        double,
      column_13                        inet,
      column_14                        varchar,
      column_15                        varint,
      column_16                        map<text,int>,
      PRIMARY KEY( ( column_1, column_2 ) )
); 

After inserting dummy data into the new test schema, 

> df.test  <- read.df(sqlContext,  source = "org.apache.spark.sql.cassandra", 
> keyspace = "datahub", table = "test_table")

assigns with no errors, 

> schema(df.test)

StructType
|-name = "column_1", type = "IntegerType", nullable = TRUE
|-name = "column_2", type = "StringType", nullable = TRUE
|-name = "column_10", type = "StringType", nullable = TRUE
|-name = "column_11", type = "DecimalType(38,18)", nullable = TRUE
|-name = "column_12", type = "DoubleType", nullable = TRUE
|-name = "column_13", type = "InetAddressType", nullable = TRUE
|-name = "column_14", type = "StringType", nullable = TRUE
|-name = "column_15", type = "DecimalType(38,0)", nullable = TRUE
|-name = "column_16", type = "MapType(StringType,IntegerType,true)", nullable = 
TRUE
|-name = "column_3", type = "FloatType", nullable = TRUE
|-name = "column_4", type = "UUIDType", nullable = TRUE
|-name = "column_5", type = "TimestampType", nullable = TRUE
|-name = "column_6", type = "BooleanType", nullable = TRUE
|-name = "column_7", type = "UUIDType", nullable = TRUE
|-name = "column_8", type = "LongType", nullable = TRUE
|-name = "column_9", type = "BinaryType", nullable = TRUE

correct schema is returned. 

> class(df.test)

[1] "DataFrame"
attr(,"package")
[1] "SparkR"

Object is a DataFrame, but now when we see if the dataFrame actually contains 
records from the column_family. 

> head(df.test)

Error in as.data.frame.default(x[[i]], optional = TRUE) : 
  cannot coerce class ""jobj"" to a data.frame

Note, I will always get the above error, when calling head on a data frame that 
is read from a cassandra column_family
that has one or more cassandra collection data types (list, set, map). 

I know R has list and vector data types which can probably be used to support 
Cassandra Sets, and Lists, but for Map, 
there are packages which enable Hash support, which can be used to replicate a 
cassandra map data type. 

Could we log this as a bug? Cassandra collection data types are widely used in 
our schemas, 
and we want to be able to build data frame from those Cassandra schemas. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to