This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 66c38cdd CASSANALYTICS-53: Remove spark 2 code (#111)
66c38cdd is described below
commit 66c38cdd16c642c31d3ac85620cad50b145b63ac
Author: Yifan Cai <[email protected]>
AuthorDate: Fri May 23 09:55:48 2025 -0700
CASSANALYTICS-53: Remove spark 2 code (#111)
Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANALYTICS-53
---
CHANGES.txt | 1 +
DEV-README.md | 32 +--
.../spark/sparksql/PartialRowBuilder.java | 4 +-
.../spark/sparksql/CassandraDataSource.java | 222 ---------------------
.../cassandra/spark/sparksql/LocalDataSource.java | 44 ----
.../cassandra/spark/sparksql/SparkRowIterator.java | 87 --------
.../spark/bulkwriter/TestTaskContext.java | 169 ----------------
.../spark/sparksql/PartitionKeyFilterTests.java | 124 ------------
.../spark/sparksql/SparkRangeFilterTests.java | 53 -----
.../SharedClusterSparkIntegrationTestBase.java | 4 +-
githooks/pre-push | 6 -
11 files changed, 7 insertions(+), 739 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 7092f15e..d7e60174 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Remove dead code for spark 2 (CASSANALYTICS-53)
* Add support for nested UDT in collections for bulk write (CASSANALYTICS-49)
* Add Sidecar Client (CASSANALYTICS-30)
* Add support for vnodes (CASSANALYTICS-50)
diff --git a/DEV-README.md b/DEV-README.md
index 2b3bec6c..1dd03a77 100644
--- a/DEV-README.md
+++ b/DEV-README.md
@@ -20,7 +20,7 @@
# Cassandra Analytics
-Cassandra Analytics supports Spark 2 (Scala 2.11 and 2.12) and Spark 3 (Scala
2.12).
+Cassandra Analytics supports Spark 3 (Scala 2.12).
This project uses Gradle as the dependency management and build framework.
@@ -54,7 +54,7 @@ variable for Cassandra dtest jars, and trunk for the sidecar.
Once you've built the dependencies, you're ready to build the analytics
project.
-Cassandra Analytics will build for Spark 2 and Scala 2.11 by default.
+Cassandra Analytics will build for Spark 3 and Scala 2.12 by default.
Navigate to the top-level directory for this project:
@@ -62,15 +62,6 @@ Navigate to the top-level directory for this project:
./gradlew clean assemble
```
-### Spark 2 and Scala 2.12
-
-To build for Scala 2.12, set the profile by exporting `SCALA_VERSION=2.12`:
-
-```shell
-export SCALA_VERSION=2.12
-./gradlew clean assemble
-```
-
### Spark 3 and Scala 2.12
To build for Spark 3 and Scala 2.12, export both `SCALA_VERSION=2.12` and
`SPARK_VERSION=3`:
@@ -110,22 +101,3 @@ Run the following profile to copy code style used for this
project:
```shell
./gradlew copyCodeStyle
```
-
-The project has different sources for Spark 2 and Spark 3.
-
-Spark 2 uses the `org.apache.spark.sql.sources.v2` APIs that have been
deprecated in Spark 3.
-
-Spark 3 uses new APIs that live in the `org.apache.spark.sql.connector.read`
namespace.
-
-By default, the project will load Spark 2 sources, but you can switch between
sources by modifying the `gradle.properties` file.
-
-For Spark 3, use the following in `gradle.properties`:
-
-```properties
-scala=2.12
-spark=3
-```
-
-And then load Gradle changes (on Mac, the shortcut to load Gradle changes is
<kbd>Command</kbd> + <kbd>Shift</kbd> + <kbd>I</kbd>).
-
-This will make the IDE pick up the Spark 3 sources, and you should now be able
to develop against Spark 3 as well.
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
index f492f266..d97f660b 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
@@ -34,8 +34,8 @@ import org.jetbrains.annotations.NotNull;
/**
* PartialRowBuilder that builds row only containing fields in requiredSchema
prune-column filter
- * NOTE: Spark 3 changed the contract from Spark 2 and requires us to only
return the columns specified in
- * the requiredSchema 'prune column' filter and not a sparse Object[] array
with null values for excluded columns
+ * NOTE: Spark 3 requires us to only return the columns specified in the
requiredSchema 'prune column' filter
+ * and not a sparse Object[] array with null values for excluded columns
*
* @param <T> type of row returned by builder
*/
diff --git
a/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/CassandraDataSource.java
b/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/CassandraDataSource.java
deleted file mode 100644
index 4676804f..00000000
---
a/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/CassandraDataSource.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.spark.sparksql;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.bridge.CassandraBridgeFactory;
-import org.apache.cassandra.bridge.CassandraVersion;
-import org.apache.cassandra.spark.data.CassandraDataLayer;
-import org.apache.cassandra.spark.data.CassandraDataSourceHelper;
-import org.apache.cassandra.spark.data.ClientConfig;
-import org.apache.cassandra.spark.data.CqlField;
-import org.apache.cassandra.spark.data.DataLayer;
-import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
-import org.apache.cassandra.spark.utils.FilterUtils;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.DataSourceRegister;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
-import org.apache.spark.sql.sources.v2.reader.partitioning.Distribution;
-import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
-import org.apache.spark.sql.types.StructType;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * A concrete implementation for the {@link CassandraDataSource}
- */
-public class CassandraDataSource implements DataSourceV2, ReadSupport,
DataSourceRegister
-{
- private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraDataSource.class);
- private DataLayer dataLayer;
-
- public CassandraDataSource()
- {
-
CassandraBridgeFactory.validateBridges(CassandraVersion.implementedVersions());
- }
-
- @Override
- public String shortName()
- {
- return "cassandraBulkRead";
- }
-
- @Override
- public DataSourceReader createReader(DataSourceOptions options)
- {
- if (dataLayer == null)
- {
- dataLayer = getDataLayer(options);
- }
- return new SSTableSourceReader(dataLayer);
- }
-
- public DataLayer getDataLayer(DataSourceOptions options)
- {
- return CassandraDataSourceHelper.getDataLayer(options.asMap(),
this::initializeDataLayer);
- }
-
- @VisibleForTesting
- void initializeDataLayer(CassandraDataLayer dataLayer, ClientConfig config)
- {
- dataLayer.initialize(config);
- }
-
- public static class SSTableSourceReader
- implements DataSourceReader, Serializable,
SupportsPushDownFilters, SupportsPushDownRequiredColumns, Partitioning
- {
- private static final long serialVersionUID = -6622216100571485739L;
- private Filter[] pushedFilters = new Filter[0];
- private final DataLayer dataLayer;
- private StructType requiredSchema = null;
-
- SSTableSourceReader(@NotNull DataLayer dataLayer)
- {
- this.dataLayer = dataLayer;
- }
-
- @Override
- public StructType readSchema()
- {
- return dataLayer.structType();
- }
-
- @Override
- public List<InputPartition<InternalRow>> planInputPartitions()
- {
- List<PartitionKeyFilter> partitionKeyFilters = new ArrayList<>();
-
- List<String> partitionKeyColumnNames =
dataLayer.cqlTable().partitionKeys().stream()
-
.map(CqlField::name)
-
.collect(Collectors.toList());
- Map<String, List<String>> partitionKeyValues =
- FilterUtils.extractPartitionKeyValues(pushedFilters, new
HashSet<>(partitionKeyColumnNames));
- if (partitionKeyValues.size() > 0)
- {
- List<List<String>> orderedValues =
partitionKeyColumnNames.stream()
-
.map(partitionKeyValues::get)
-
.collect(Collectors.toList());
- FilterUtils.cartesianProduct(orderedValues).forEach(keys -> {
- AbstractMap.SimpleEntry<ByteBuffer, BigInteger> filterKey =
-
dataLayer.bridge().getPartitionKey(dataLayer.cqlTable(),
dataLayer.partitioner(), keys);
-
partitionKeyFilters.add(PartitionKeyFilter.create(filterKey.getKey(),
filterKey.getValue()));
- });
- }
- LOGGER.info("Creating data reader factories numPartitions={}",
dataLayer.partitionCount());
- return IntStream.range(0, dataLayer.partitionCount())
- .mapToObj(partitionId -> new
SerializableInputPartition(partitionId, dataLayer, requiredSchema,
partitionKeyFilters))
- .collect(Collectors.toList());
- }
-
- public static class SerializableInputPartition implements
InputPartition<InternalRow>
- {
- private static final long serialVersionUID = -7916492108742137769L;
- private final int partitionId;
- @NotNull
- private final DataLayer dataLayer;
- @NotNull
- private final StructType requiredSchema;
- @NotNull
- private final List<PartitionKeyFilter> partitionKeyFilters;
-
- public SerializableInputPartition(int partitionId,
- @NotNull DataLayer dataLayer,
- @NotNull StructType
requiredSchema,
- @NotNull
List<PartitionKeyFilter> partitionKeyFilters)
- {
- this.partitionId = partitionId;
- this.dataLayer = dataLayer;
- this.requiredSchema = requiredSchema;
- this.partitionKeyFilters = partitionKeyFilters;
- }
-
- @Override
- @NotNull
- public InputPartitionReader<InternalRow> createPartitionReader()
- {
- return new SparkRowIterator(partitionId, dataLayer,
requiredSchema, partitionKeyFilters);
- }
- }
-
- /**
- * Pushes down filters, and returns filters that need to be evaluated
after scanning
- *
- * @param filters the filters in the query
- * @return filters that need to be evaluated after scanning
- */
- @Override
- public Filter[] pushFilters(Filter[] filters)
- {
- Filter[] unsupportedFilters =
dataLayer.unsupportedPushDownFilters(filters);
-
- List<Filter> supportedFilters = Lists.newArrayList(filters);
- supportedFilters.removeAll(Arrays.asList(unsupportedFilters));
- pushedFilters = supportedFilters.stream().toArray(Filter[]::new);
-
- return unsupportedFilters;
- }
-
- @Override
- public Filter[] pushedFilters()
- {
- return pushedFilters;
- }
-
- @Override
- public void pruneColumns(StructType requiredSchema)
- {
- this.requiredSchema = requiredSchema;
- }
-
- @Override
- public int numPartitions()
- {
- return dataLayer.partitionCount();
- }
-
- @Override
- public boolean satisfy(Distribution distribution)
- {
- return true;
- }
- }
-}
diff --git
a/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/LocalDataSource.java
b/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/LocalDataSource.java
deleted file mode 100644
index 135ffe5b..00000000
---
a/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/LocalDataSource.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.spark.sparksql;
-
-import org.apache.cassandra.spark.data.DataLayer;
-import org.apache.cassandra.spark.data.LocalDataLayer;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.jetbrains.annotations.NotNull;
-
-@SuppressWarnings("unused")
-public class LocalDataSource extends CassandraDataSource
-{
- @Override
- @NotNull
- public String shortName()
- {
- return "localsstabledatasource";
- }
-
- @Override
- @NotNull
- public DataLayer getDataLayer(@NotNull DataSourceOptions options)
- {
- // options.asMap() returns the keyLowerCasedMap, therefore all the
keys need to be lower-cased
- return LocalDataLayer.from(options.asMap());
- }
-}
diff --git
a/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
b/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
deleted file mode 100644
index c870dd5d..00000000
---
a/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.spark.sparksql;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Function;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.cassandra.spark.config.SchemaFeature;
-import org.apache.cassandra.spark.data.DataLayer;
-import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-import org.apache.spark.sql.types.StructType;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Wrapper iterator around SparkCellIterator to normalize cells into Spark SQL
rows
- */
-public class SparkRowIterator extends AbstractSparkRowIterator<InternalRow>
implements InputPartitionReader<InternalRow>
-{
- @VisibleForTesting
- public SparkRowIterator(int partitionId, @NotNull DataLayer dataLayer)
- {
- this(partitionId, dataLayer, null, new ArrayList<>());
- }
-
- protected SparkRowIterator(int partitionId,
- @NotNull DataLayer dataLayer,
- @Nullable StructType requiredSchema,
- @NotNull List<PartitionKeyFilter>
partitionKeyFilters)
- {
- super(partitionId,
- dataLayer,
- requiredSchema,
- partitionKeyFilters,
- (builder) -> decorate(builder, dataLayer.requestedFeatures()));
- }
-
- @Override
- @NotNull
- protected RowBuilder<InternalRow>
newBuilder(Function<RowBuilder<InternalRow>, RowBuilder<InternalRow>> decorator)
- {
- RowBuilder<InternalRow> builder = new FullRowBuilder<>(it.cqlTable(),
it.hasProjectedValueColumns(), this::rowBuilder);
- builder = decorator.apply(builder);
- builder.reset();
- return builder;
- }
-
- protected static RowBuilder<InternalRow> decorate(RowBuilder<InternalRow>
builder,
- List<SchemaFeature>
features)
- {
- for (SchemaFeature feature : features)
- {
- builder = feature.decorate(builder);
- }
-
- return builder;
- }
-
- @Override
- public InternalRow rowBuilder(Object[] result)
- {
- return new GenericInternalRow((result));
- }
-}
diff --git
a/cassandra-analytics-core/src/test/spark2/org/apache/cassandra/spark/bulkwriter/TestTaskContext.java
b/cassandra-analytics-core/src/test/spark2/org/apache/cassandra/spark/bulkwriter/TestTaskContext.java
deleted file mode 100644
index 06200a28..00000000
---
a/cassandra-analytics-core/src/test/spark2/org/apache/cassandra/spark/bulkwriter/TestTaskContext.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.spark.bulkwriter;
-
-import java.util.Properties;
-
-import org.apache.spark.TaskContext;
-import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.memory.TaskMemoryManager;
-import org.apache.spark.metrics.source.Source;
-import org.apache.spark.shuffle.FetchFailedException;
-import org.apache.spark.util.AccumulatorV2;
-import org.apache.spark.util.TaskCompletionListener;
-import org.apache.spark.util.TaskFailureListener;
-import scala.Option;
-import scala.collection.Seq;
-
-public class TestTaskContext extends TaskContext
-{
- @Override
- public boolean isCompleted()
- {
- return false;
- }
-
- @Override
- public boolean isInterrupted()
- {
- return false;
- }
-
- @Override
- @Deprecated
- public boolean isRunningLocally()
- {
- return false;
- }
-
- @Override
- public TaskContext addTaskCompletionListener(TaskCompletionListener
listener)
- {
- return null;
- }
-
- @Override
- public TaskContext addTaskFailureListener(TaskFailureListener listener)
- {
- return null;
- }
-
- @Override
- public int stageId()
- {
- return 0;
- }
-
- @Override
- public int stageAttemptNumber()
- {
- return 0;
- }
-
- @Override
- public int partitionId()
- {
- return 0;
- }
-
- @Override
- public int attemptNumber()
- {
- return 0;
- }
-
- @Override
- public long taskAttemptId()
- {
- return 0;
- }
-
- @Override
- public String getLocalProperty(String key)
- {
- return null;
- }
-
- @Override
- public TaskMetrics taskMetrics()
- {
- return null;
- }
-
- @Override
- public Seq<Source> getMetricsSources(String sourceName)
- {
- return null;
- }
-
- @Override
- public void killTaskIfInterrupted()
- {
- }
-
- @Override
- public Option<String> getKillReason()
- {
- return null;
- }
-
- @Override
- public TaskMemoryManager taskMemoryManager()
- {
- return null;
- }
-
- @Override
- public void registerAccumulator(AccumulatorV2 accumulator)
- {
- }
-
- @Override
- public void setFetchFailed(FetchFailedException fetchFailed)
- {
- }
-
- @Override
- public void markInterrupted(String reason)
- {
- }
-
- @Override
- public void markTaskFailed(Throwable error)
- {
- }
-
- @Override
- public void markTaskCompleted(Option<Throwable> error)
- {
- }
-
- @Override
- public Option<FetchFailedException> fetchFailed()
- {
- return null;
- }
-
- @Override
- public Properties getLocalProperties()
- {
- return null;
- }
-}
diff --git
a/cassandra-analytics-core/src/test/spark2/org/apache/cassandra/spark/sparksql/PartitionKeyFilterTests.java
b/cassandra-analytics-core/src/test/spark2/org/apache/cassandra/spark/sparksql/PartitionKeyFilterTests.java
deleted file mode 100644
index b38c7cc7..00000000
---
a/cassandra-analytics-core/src/test/spark2/org/apache/cassandra/spark/sparksql/PartitionKeyFilterTests.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.spark.sparksql;
-
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import com.google.common.collect.Range;
-import org.junit.jupiter.api.Test;
-
-import org.apache.cassandra.bridge.TokenRange;
-import org.apache.cassandra.spark.TestUtils;
-import org.apache.cassandra.spark.data.partitioner.CassandraRing;
-import org.apache.cassandra.spark.data.partitioner.Partitioner;
-import org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
-import org.apache.cassandra.spark.reader.SparkSSTableReader;
-import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
-import org.apache.cassandra.spark.utils.RangeUtils;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.quicktheories.QuickTheory.qt;
-import static org.quicktheories.generators.SourceDSL.arbitrary;
-
-public class PartitionKeyFilterTests
-{
- @Test
- public void testValidFilter()
- {
- qt().forAll(TestUtils.bridges())
- .checkAssert(bridge -> {
- ByteBuffer key = bridge.aInt().serialize(10);
- BigInteger token = bridge.hash(Partitioner.Murmur3Partitioner,
key);
- PartitionKeyFilter filter = PartitionKeyFilter.create(key,
token);
-
- ByteBuffer diffKey = bridge.aInt().serialize(11);
- TokenRange inRange = TokenRange.singleton(token);
- TokenRange notInRange =
TokenRange.singleton(token.subtract(BigInteger.ONE));
- SparkSSTableReader reader = mock(SparkSSTableReader.class);
- when(reader.range()).thenReturn(TokenRange.singleton(token));
-
- assertTrue(filter.filter(key));
- assertFalse(filter.filter(diffKey));
- assertTrue(filter.overlaps(inRange));
- assertFalse(filter.overlaps(notInRange));
- assertTrue(filter.matches(key));
- assertFalse(filter.matches(diffKey));
- assertTrue(SparkSSTableReader.overlaps(reader,
filter.tokenRange()));
- });
- }
-
- @Test
- public void testEmptyKey()
- {
- assertThrows(IllegalArgumentException.class, () ->
-
PartitionKeyFilter.create(ByteBuffer.wrap(ByteBufferUtils.EMPTY),
-
BigInteger.ZERO));
- }
-
- @Test
- public void testTokenRing()
- {
- qt().forAll(TestUtils.bridges(), TestUtils.partitioners(),
arbitrary().pick(Arrays.asList(1, 3, 6, 12, 128)))
- .checkAssert((bridge, partitioner, numInstances) -> {
- CassandraRing ring = TestUtils.createRing(partitioner,
numInstances);
- TokenPartitioner tokenPartitioner = new TokenPartitioner(ring,
24, 24);
- List<BigInteger> boundaryTokens = IntStream.range(0,
tokenPartitioner.numPartitions())
-
.mapToObj(tokenPartitioner::getTokenRange)
- .map(range ->
Arrays.asList(range.lowerEndpoint(),
-
midPoint(range),
-
range.upperEndpoint()))
-
.flatMap(Collection::stream)
-
.collect(Collectors.toList());
- for (BigInteger token : boundaryTokens)
- {
- if (token.equals(partitioner.minToken()))
- {
- // minToken is excluded in the ring
- continue;
- }
- // Check boundary tokens only match 1 Spark token range
- PartitionKeyFilter filter =
PartitionKeyFilter.create(bridge.aInt().serialize(11), token);
- assertEquals(1, tokenPartitioner.subRanges().stream()
-
.map(RangeUtils::toTokenRange)
- .filter(filter::overlaps)
- .count());
- }
- });
- }
-
- private static BigInteger midPoint(Range<BigInteger> range)
- {
- return range.upperEndpoint()
- .subtract(range.lowerEndpoint())
- .divide(BigInteger.valueOf(2L))
- .add(range.lowerEndpoint());
- }
-}
diff --git
a/cassandra-analytics-core/src/test/spark2/org/apache/cassandra/spark/sparksql/SparkRangeFilterTests.java
b/cassandra-analytics-core/src/test/spark2/org/apache/cassandra/spark/sparksql/SparkRangeFilterTests.java
deleted file mode 100644
index 00e8231b..00000000
---
a/cassandra-analytics-core/src/test/spark2/org/apache/cassandra/spark/sparksql/SparkRangeFilterTests.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.spark.sparksql;
-
-import java.math.BigInteger;
-
-import org.junit.jupiter.api.Test;
-
-import org.apache.cassandra.bridge.TokenRange;
-import org.apache.cassandra.spark.reader.SparkSSTableReader;
-import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class SparkRangeFilterTests
-{
- @Test
- public void testValidFilter()
- {
- TokenRange connected = TokenRange.closed(BigInteger.ONE,
BigInteger.valueOf(2L));
- TokenRange notConnected = TokenRange.closed(BigInteger.valueOf(2L),
BigInteger.TEN);
-
- SparkRangeFilter filter =
SparkRangeFilter.create(TokenRange.closed(BigInteger.ZERO, BigInteger.ONE));
- SparkSSTableReader reader = mock(SparkSSTableReader.class);
- when(reader.range()).thenReturn(connected);
-
- assertTrue(filter.overlaps(connected));
- assertFalse(filter.overlaps(notConnected));
- assertTrue(filter.skipPartition(BigInteger.TEN));
- assertFalse(filter.skipPartition(BigInteger.ONE));
- assertTrue(SparkSSTableReader.overlaps(reader, filter.tokenRange()));
- }
-}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
index 465cbc9a..9a7d5889 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
@@ -229,7 +229,7 @@ public abstract class SharedClusterSparkIntegrationTestBase
extends SharedCluste
}
else if (o instanceof Seq) // can't differentiate between scala list
and set, both come here as Seq
{
- List<?> entries = JavaConverters.seqAsJavaList((Seq<?>) o);
+ List<?> entries = JavaConverters.seqAsJavaListConverter((Seq<?>)
o).asJava();
sb.append("{");
for (int i = 0; i < entries.size(); i++)
{
@@ -243,7 +243,7 @@ public abstract class SharedClusterSparkIntegrationTestBase
extends SharedCluste
}
else if (o instanceof scala.collection.Map)
{
- Map<?, ?> map =
JavaConverters.mapAsJavaMap(((scala.collection.Map<?, ?>) o));
+ Map<?, ?> map =
JavaConverters.mapAsJavaMapConverter(((scala.collection.Map<?, ?>) o)).asJava();
for (Map.Entry<?, ?> entry : map.entrySet())
{
sb.append("{");
diff --git a/githooks/pre-push b/githooks/pre-push
index 5fad5af3..4583e70e 100755
--- a/githooks/pre-push
+++ b/githooks/pre-push
@@ -33,12 +33,6 @@ checkstyle() {
echo "Running pre-push hook..."
-# scala 2.11 && spark 2
-checkstyle 2.11 2
-
-# scala 2.12 && spark 2
-checkstyle 2.12 2
-
# scala 2.12 && spark 3
checkstyle 2.12 3
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]