Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-22 Thread via GitHub


frankgh merged PR #41:
URL: https://github.com/apache/cassandra-analytics/pull/41


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-21 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1534349869


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java:
##
@@ -130,6 +130,22 @@ private void writeObject(ObjectOutputStream out) throws 
IOException
 out.writeUTF(ringEntry.fqdn());
 out.writeUTF(ringEntry.status());
 out.writeUTF(ringEntry.state());
+if (ringEntry.rack() != null)

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-21 Thread via GitHub


frankgh commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1534250572


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java:
##
@@ -130,6 +130,22 @@ private void writeObject(ObjectOutputStream out) throws 
IOException
 out.writeUTF(ringEntry.fqdn());
 out.writeUTF(ringEntry.status());
 out.writeUTF(ringEntry.state());
+if (ringEntry.rack() != null)

Review Comment:
   I think this is not sufficient, I pinged you offline for a suggestion



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsImpl.java:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link JobStats} that is used to record stats through the 
course of the
+ * Spark job execution and publish them. This implementation logs the stats 
when published.
+ */
+public class JobStatsImpl implements JobStats
+{
+private static final Logger LOGGER = 
LoggerFactory.getLogger(JobStatsImpl.class);
+private final transient Map jobStats = new HashMap<>();
+public void recordJobStats(Map stats)
+{
+jobStats.putAll(stats);
+}
+
+public void publishJobStats()

Review Comment:
   can we add override annotations?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-21 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1534232804


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java:
##
@@ -21,7 +21,9 @@
 
 import java.io.Serializable;
 
-public interface BulkWriterContext extends Serializable
+import org.apache.cassandra.spark.common.JobStats;
+
+public interface BulkWriterContext extends Serializable, JobStats

Review Comment:
   Makes sense. Incorporated this change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-21 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1534231920


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java:
##
@@ -56,10 +57,12 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
 private final CassandraClusterInfo clusterInfo;
 private final SchemaInfo schemaInfo;
 
-private CassandraBulkWriterContext(@NotNull BulkSparkConf conf,
-   @NotNull CassandraClusterInfo 
clusterInfo,
-   @NotNull StructType dfSchema,
-   SparkContext sparkContext)
+private final Map jobStats = new HashMap<>();

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-21 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1534231627


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriteResult.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.List;
+
+import scala.Serializable;

Review Comment:
   Correct. fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-21 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1533473809


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##
@@ -129,6 +143,64 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str
 }
 }
 
+private void recordSuccessfulJobStats(List writeResults)
+{
+List streamResults = writeResults.stream()
+   
.map(WriteResult::streamResults)
+   
.flatMap(Collection::stream)
+   
.collect(Collectors.toList());
+
+long rowCount = streamResults.stream().mapToLong(res -> 
res.rowCount).sum();
+long totalBytesWritten = streamResults.stream().mapToLong(res -> 
res.bytesWritten).sum();
+boolean hasClusterTopologyChanged = writeResults.stream()
+
.map(WriteResult::isClusterResizeDetected)
+.anyMatch(b -> b);
+LOGGER.info("Bulk writer has written {} rows and {} bytes with 
cluster-resize status: {}",
+rowCount,
+totalBytesWritten,
+hasClusterTopologyChanged);
+writerContext.recordJobStats(new HashMap<>()
+{
+{
+put("rowsWritten", Long.toString(rowCount));
+put("bytesWritten", Long.toString(totalBytesWritten));
+put("jobStatus", "Succeeded");
+put("clusterResizeDetected", 
String.valueOf(hasClusterTopologyChanged));
+put("jobElapsedTimeMillis", 
Long.toString(getElapsedTimeMillis()));
+}
+});
+}
+
+private void recordFailureStats(String reason)
+{
+writerContext.recordJobStats(new HashMap<>()
+{
+{
+put("jobStatus", "Failed");
+put("failureReason", reason);
+put("jobElapsedTimeMillis", 
Long.toString(getElapsedTimeMillis()));
+}
+});
+}
+
+private long getElapsedTimeMillis()
+{
+long now = System.nanoTime();
+return TimeUnit.NANOSECONDS.toMillis(now - this.startTimeNanos);
+}
+
+/**
+ * Get a ref copy of BulkWriterContext broadcast variable and compose a 
function to transform a partition into StreamResult
+ *
+ * @param ctx BulkWriterContext broadcast variable
+ * @return FlatMapFunction
+ */
+private static FlatMapFunction>, 
WriteResult>
+partitionsFlatMapFunc(Broadcast ctx, String[] 
columnNames)

Review Comment:
   Makes sense. Reverted



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java:
##
@@ -125,40 +126,28 @@ private void writeObject(ObjectOutputStream out) throws 
IOException
 out.writeUTF(ringEntry.address());
 out.writeInt(ringEntry.port());
 out.writeUTF(ringEntry.datacenter());
-out.writeUTF(ringEntry.load());

Review Comment:
   Unfortunately, the fields are members of the `RingEntry` instance (part of 
the sidecar client library) so they cannot be removed here.  
   
   I can add checks for the ones not in `ReplicaMetadata` for completeness, but 
the intention here was to not write the fields that we know are not used anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-20 Thread via GitHub


JeetKunDoug commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1532764805


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##
@@ -129,6 +143,64 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str
 }
 }
 
+private void recordSuccessfulJobStats(List writeResults)
+{
+List streamResults = writeResults.stream()
+   
.map(WriteResult::streamResults)
+   
.flatMap(Collection::stream)
+   
.collect(Collectors.toList());
+
+long rowCount = streamResults.stream().mapToLong(res -> 
res.rowCount).sum();
+long totalBytesWritten = streamResults.stream().mapToLong(res -> 
res.bytesWritten).sum();
+boolean hasClusterTopologyChanged = writeResults.stream()
+
.map(WriteResult::isClusterResizeDetected)
+.anyMatch(b -> b);
+LOGGER.info("Bulk writer has written {} rows and {} bytes with 
cluster-resize status: {}",
+rowCount,
+totalBytesWritten,
+hasClusterTopologyChanged);
+writerContext.recordJobStats(new HashMap<>()
+{
+{
+put("rowsWritten", Long.toString(rowCount));
+put("bytesWritten", Long.toString(totalBytesWritten));
+put("jobStatus", "Succeeded");
+put("clusterResizeDetected", 
String.valueOf(hasClusterTopologyChanged));
+put("jobElapsedTimeMillis", 
Long.toString(getElapsedTimeMillis()));
+}
+});
+}
+
+private void recordFailureStats(String reason)
+{
+writerContext.recordJobStats(new HashMap<>()
+{
+{
+put("jobStatus", "Failed");
+put("failureReason", reason);
+put("jobElapsedTimeMillis", 
Long.toString(getElapsedTimeMillis()));
+}
+});
+}
+
+private long getElapsedTimeMillis()
+{
+long now = System.nanoTime();
+return TimeUnit.NANOSECONDS.toMillis(now - this.startTimeNanos);
+}
+
+/**
+ * Get a ref copy of BulkWriterContext broadcast variable and compose a 
function to transform a partition into StreamResult
+ *
+ * @param ctx BulkWriterContext broadcast variable
+ * @return FlatMapFunction
+ */
+private static FlatMapFunction>, 
WriteResult>
+partitionsFlatMapFunc(Broadcast ctx, String[] 
columnNames)

Review Comment:
   Why rename this function from `writeRowsInPartition` to 
`partitionsFlatMapFunc` - the first is a descriptive name for what the function 
does, the second is a descriptive name for what interface the returned value 
implements... one is "business domain" and the other is "implementation 
detail", and I think the former has much more value than the latter.



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/JobStats.java:
##
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common;
+
+import java.util.Map;
+
+/**
+ * Interface to provide functionality to report Spark Job Statistics and/or 
properties
+ * that can optionally be instrumented. The default implementation merely logs 
these
+ * stats at the end of the job.
+ */
+public interface JobStats

Review Comment:
   See my other comments about how to implement this in BulkWriterContext - may 
not need to be Serializable if we move it to an instance variable like we've 
done with other parts of the BulkWriterContext interface.



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java:
##
@@ -21,7 +21,9 @@
 
 import java.io.Serializable;
 
-public interface BulkWriterContext extends Serializable
+import org.apache.cassandra.spark.common.JobStats;
+
+public interface BulkWriterContext extends Serializable, JobSta

Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-20 Thread via GitHub


frankgh commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1532396813


##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java:
##
@@ -77,6 +78,23 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
 private int sstableDataSizeInMB = 128;
 private int sstableWriteBatchSize = 2;
 
+private final Map jobStats = new HashMap<>();
+
+public Map jobStats()
+{
+return jobStats;
+}

Review Comment:
   NIT: missing line break
   ```suggestion
   }
   
   ```



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriteResult.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.List;
+
+import scala.Serializable;

Review Comment:
   shouldn't this be a `java.io.Serializable` instead?



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java:
##
@@ -59,6 +59,9 @@ public class SSTableWriter
 private final Map fileDigestMap = new HashMap<>();
 private final DigestAlgorithm digestAlgorithm;
 
+private long rowCount = 0;
+

Review Comment:
   NIT: remove extra line break
   ```suggestion
   ```



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java:
##
@@ -100,6 +103,15 @@ public void addRow(BigInteger token, Map 
boundValues) throws IOE
 }
 maxToken = token;
 cqlSSTableWriter.addRow(boundValues);
+rowCount += 1;
+}
+
+/**
+ * @return the total number of rows added

Review Comment:
   ```suggestion
* @return the total number of rows written
   ```



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java:
##
@@ -125,40 +126,28 @@ private void writeObject(ObjectOutputStream out) throws 
IOException
 out.writeUTF(ringEntry.address());
 out.writeInt(ringEntry.port());
 out.writeUTF(ringEntry.datacenter());
-out.writeUTF(ringEntry.load());

Review Comment:
   shouldn't we handle nulls during serialization/deserialization instead? 



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriteResult.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.List;
+
+import scala.Serializable;
+
+public class WriteResult implements Serializable

Review Comment:
   let's add javadocs here



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriteResult.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distribute

Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-11 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1520641363


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java:
##
@@ -21,7 +21,9 @@
 
 import java.io.Serializable;
 
-public interface BulkWriterContext extends Serializable
+import org.apache.cassandra.spark.common.Reportable;
+
+public interface BulkWriterContext extends Serializable, Reportable

Review Comment:
   We could potentially have this implemented by the concrete 
`CassandraBulkWriterContext` to keep it consistent with how `dialHome` is 
currently implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-11 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1520627909


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##
@@ -107,17 +112,25 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str
 {
 try
 {
-sortedRDD.foreachPartition(writeRowsInPartition(broadcastContext, 
columnNames));
+List results = sortedRDD
+ 
.mapPartitions(partitionsFlatMapFunc(broadcastContext, columnNames))
+ .collect();
+long rowCount = results.stream().mapToLong(res -> 
res.rowCount).sum();
+long totalBytesWritten = results.stream().mapToLong(res -> 
res.bytesWritten).sum();
+LOGGER.info("Bulk writer has written {} rows and {} bytes", 
rowCount, totalBytesWritten);
+recordSuccessfulJobStats(rowCount, totalBytesWritten);
 }
 catch (Throwable throwable)
 {
+recordFailureStats(throwable.getMessage());
 LOGGER.error("Bulk Write Failed", throwable);
 throw new RuntimeException("Bulk Write to Cassandra has failed", 
throwable);
 }
 finally
 {
 try
 {
+writerContext.publishJobStats();

Review Comment:
   You're right. Fixed to only report/publish stats from the driver. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-05 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1513798245


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##
@@ -107,17 +112,25 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str
 {
 try
 {
-sortedRDD.foreachPartition(writeRowsInPartition(broadcastContext, 
columnNames));
+List results = sortedRDD
+ 
.mapPartitions(partitionsFlatMapFunc(broadcastContext, columnNames))
+ .collect();
+long rowCount = results.stream().mapToLong(res -> 
res.rowCount).sum();
+long totalBytesWritten = results.stream().mapToLong(res -> 
res.bytesWritten).sum();
+LOGGER.info("Bulk writer has written {} rows and {} bytes", 
rowCount, totalBytesWritten);
+recordSuccessfulJobStats(rowCount, totalBytesWritten);
 }
 catch (Throwable throwable)
 {
+recordFailureStats(throwable.getMessage());
 LOGGER.error("Bulk Write Failed", throwable);
 throw new RuntimeException("Bulk Write to Cassandra has failed", 
throwable);
 }
 finally
 {
 try
 {
+writerContext.publishJobStats();

Review Comment:
   So, the change is not propagating data back to the driver, but publishes 
stats from the executors. I am assuming here that the context is made available 
to the executors so what you are saying does not need to happen. Let me know if 
that makes sense.
   
   I have been able to validate this using the in-jvm-dtests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-05 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1513793354


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java:
##
@@ -21,7 +21,9 @@
 
 import java.io.Serializable;
 
-public interface BulkWriterContext extends Serializable
+import org.apache.cassandra.spark.common.Reportable;
+
+public interface BulkWriterContext extends Serializable, Reportable

Review Comment:
   So, the functionality provided by the new interface is replacing the 
existing `dialHome` method in the `CassandraBulkWriterContext` .  
   
   The thinking is that this is tied to the "context" that is shared across 
executors, as we "record" initial stats and job status stats at the executor 
level and the "inflight" stats at the task level.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-05 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1513793217


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java:
##
@@ -49,6 +49,7 @@ public RingInstance(ReplicaMetadata replica)
  .datacenter(replica.datacenter())
  .state(replica.state())
  .status(replica.status())
+ .token("")

Review Comment:
   Answered in the response below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-05 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1513793094


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java:
##
@@ -125,40 +126,28 @@ private void writeObject(ObjectOutputStream out) throws 
IOException
 out.writeUTF(ringEntry.address());
 out.writeInt(ringEntry.port());
 out.writeUTF(ringEntry.datacenter());
-out.writeUTF(ringEntry.load());

Review Comment:
   Since We are now returning the `StreamResult` back from the tasks, the 
existing implementation will result in NPEs while serializing the contained 
`RingInstance`, due to many of these fields not being defined when we create 
the `RingInstance` from `ReplicaMetadata`.  The change removes the fields not 
being used from RingInstance from the serialization context.
   
   Likewise, the change is also explicitly setting the `token` field to a 
default for the same reason, since `token` is part of the equals/hashcode 
validations.
   
   Stack trace:
   
   ```
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 
in stage 1.0 (TID 40) (172.20.10.7 executor driver): 
com.esotericsoftware.kryo.KryoException: Error during Java serialization.
   Serialization trace:
   instance (org.apache.cassandra.spark.bulkwriter.CommitResult)
   commitResults (org.apache.cassandra.spark.bulkwriter.StreamResult)
at 
org.apache.cassandra.spark.bulkwriter.util.SbwJavaSerializer.write(SbwJavaSerializer.java:58)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-05 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1513792433


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/Reportable.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common;
+
+import java.util.Map;
+
+/**
+ * Interface to provide functionality to report Spark Job Statistics and/or 
properties
+ * that can optionally be instrumented. The default implementation merely logs 
these
+ * stats at the end of the job.
+ */
+public interface Reportable

Review Comment:
   This is not meant to be specific to the writer, so can be renamed to 
`JobStats` maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-05 Thread via GitHub


arjunashok commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1513792138


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/Reportable.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common;

Review Comment:
   Makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-03-01 Thread via GitHub


frankgh commented on code in PR #41:
URL: 
https://github.com/apache/cassandra-analytics/pull/41#discussion_r1509715774


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java:
##
@@ -21,7 +21,9 @@
 
 import java.io.Serializable;
 
-public interface BulkWriterContext extends Serializable
+import org.apache.cassandra.spark.common.Reportable;
+
+public interface BulkWriterContext extends Serializable, Reportable

Review Comment:
   I don't think this belongs in the `BulkWriterContext`. The context is 
something we share with executors. However the job stats are tracked from the 
driver only. I would avoid adding anything else to the BulkWriterContext 



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/Reportable.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common;

Review Comment:
   should this be in a stats or metrics package instead?



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java:
##
@@ -49,6 +49,7 @@ public RingInstance(ReplicaMetadata replica)
  .datacenter(replica.datacenter())
  .state(replica.state())
  .status(replica.status())
+ .token("")

Review Comment:
   why this change?



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##
@@ -107,17 +112,25 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str
 {
 try
 {
-sortedRDD.foreachPartition(writeRowsInPartition(broadcastContext, 
columnNames));
+List results = sortedRDD
+ 
.mapPartitions(partitionsFlatMapFunc(broadcastContext, columnNames))
+ .collect();
+long rowCount = results.stream().mapToLong(res -> 
res.rowCount).sum();
+long totalBytesWritten = results.stream().mapToLong(res -> 
res.bytesWritten).sum();
+LOGGER.info("Bulk writer has written {} rows and {} bytes", 
rowCount, totalBytesWritten);
+recordSuccessfulJobStats(rowCount, totalBytesWritten);
 }
 catch (Throwable throwable)
 {
+recordFailureStats(throwable.getMessage());
 LOGGER.error("Bulk Write Failed", throwable);
 throw new RuntimeException("Bulk Write to Cassandra has failed", 
throwable);
 }
 finally
 {
 try
 {
+writerContext.publishJobStats();

Review Comment:
   I don't think this will do what you think it will do. This object _should_ 
be treated as a read-only object from the point of view of the executors. Data 
won't propagate back to the driver once the executors complete



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/Reportable.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common;
+
+import java.util.Map;
+
+/**
+ * Interface to provide functionality to report Spark Job Statistics and/or 
properties
+ * that can optionally be instrumented. The default implementation merely logs 
these
+ * stats at the end of the job.
+ */

[PR] CASSANDRA-19418 - Changes to report additional bulk analytics job stats for instrumentation [cassandra-analytics]

2024-02-21 Thread via GitHub


arjunashok opened a new pull request, #41:
URL: https://github.com/apache/cassandra-analytics/pull/41

   ## Changes
   
   - Update to relay the results of the spark tasks to the executor to 
instrument the total no. of rows written.
   - Abstracts the current `dialHome` implementation within 
`CassandraBulkWriterContext` to be pluggable via a `Reportable` interface, that 
can allow recording of job parameters both at the executor level and at the 
task level.
   - The above abstraction also separates the recording of the metadata as the 
job progresses and separately publish them at the end of the job with 
success/failure status and failure reason where applicable.
   
   ## Testing
   - Validated successful build run w/ unit/integration tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org