frankgh commented on code in PR #45:
URL: 
https://github.com/apache/cassandra-analytics/pull/45#discussion_r1535498626


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java:
##########
@@ -87,11 +87,21 @@ private CassandraBulkWriterContext(@NotNull BulkSparkConf 
conf,
         Set<String> udts = CqlUtils.extractUdts(keyspaceSchema, keyspace);
         ReplicationFactor replicationFactor = 
CqlUtils.extractReplicationFactor(keyspaceSchema, keyspace);
         int indexCount = CqlUtils.extractIndexCount(keyspaceSchema, keyspace, 
table);
-        CqlTable cqlTable = bridge.buildSchema(createTableSchema, keyspace, 
replicationFactor, partitioner, udts, null, indexCount);
+        CqlTable cqlTable = bridge().buildSchema(createTableSchema, keyspace, 
replicationFactor, partitioner, udts, null, indexCount);
 
         TableInfoProvider tableInfoProvider = new 
CqlTableInfoProvider(createTableSchema, cqlTable);
         TableSchema tableSchema = initializeTableSchema(conf, dfSchema, 
tableInfoProvider, lowestCassandraVersion);
-        schemaInfo = new CassandraSchemaInfo(tableSchema);
+        schemaInfo = new CassandraSchemaInfo(tableSchema, udts, cqlTable);
+    }
+
+    @Override
+    public synchronized CassandraBridge bridge()
+    {
+        if (this.bridge == null)
+        {
+            this.bridge = 
CassandraBridgeFactory.get(this.clusterInfo.getLowestCassandraVersion());

Review Comment:
   it feels like the driver should perform this operation once (it's a 
potentially expensive operation) and we should serialize the resulting version 
value to the executors. Executors should just use the version value that the 
driver computed.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java:
##########
@@ -64,7 +65,6 @@ private CassandraBulkWriterContext(@NotNull BulkSparkConf 
conf,
         this.conf = conf;
         this.clusterInfo = clusterInfo;
         String lowestCassandraVersion = 
clusterInfo.getLowestCassandraVersion();
-        CassandraBridge bridge = 
CassandraBridgeFactory.get(lowestCassandraVersion);

Review Comment:
   Nothing wrong still initializing the bridge here. Should we preserve it?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java:
##########
@@ -98,9 +98,20 @@ public CqlField.CqlType getColumnType(String columnName)
     @Override
     public List<ColumnType<?>> getPartitionKeyTypes()
     {
-        return cqlTable.partitionKeys().stream()
-                       .map(cqlField -> 
DATA_TYPES.get(cqlField.type().cqlName().toLowerCase()))
-                       .collect(Collectors.toList());
+        List<ColumnType<?>> types = cqlTable.partitionKeys().stream()
+                                              .map(cqlField -> {
+                                                  String typeName = 
cqlField.type().cqlName().toLowerCase();

Review Comment:
   why do we lower case the cqlName? Can we add tests for quoted UDTs as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to