HeartSaVioR commented on a change in pull request #1487:
URL: https://github.com/apache/iceberg/pull/1487#discussion_r494049778



##########
File path: 
spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitionPruning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false },
+        new Object[] { "orc", true },
+    };
+  }
+
+  private final String format;
+  private final boolean vectorized;
+
+  public TestPartitionPruning(String format, boolean vectorized) {
+    this.format = format;
+    this.vectorized = vectorized;
+  }
+
+  private static SparkSession spark = null;
+
+  private static Transform<Object, Integer> bucketTransform = 
Transforms.bucket(Types.IntegerType.get(), 3);
+  private static Transform<Object, Object> truncateTransform = 
Transforms.truncate(Types.StringType.get(), 5);
+  private static Transform<Object, Integer> hourTransform = 
Transforms.hour(Types.TimestampType.withZone());
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionPruning.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
+    String optionKey = String.format("fs.%s.impl", 
CountOpenLocalFileSystem.scheme);
+
+    CONF.set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.udf().register("bucket3", (Integer num) -> 
bucketTransform.apply(num), DataTypes.IntegerType);
+    spark.udf().register("truncate5", (String str) -> 
truncateTransform.apply(str), DataTypes.StringType);
+    // NOTE: date transforms take the type long, not Timestamp
+    spark.udf().register("hour", (Timestamp ts) -> hourTransform.apply(
+        (Long) 
org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(ts)),
+        DataTypes.IntegerType);
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestPartitionPruning.spark;
+    TestPartitionPruning.spark = null;
+    currentSpark.stop();
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get()),
+      Types.NestedField.optional(5, "timestamp", 
Types.TimestampType.withZone())
+  );
+
+  private static final List<LogMessage> LOGS = ImmutableList.of(
+      LogMessage.debug("2020-02-02", "debug event 1", 
Timestamp.valueOf("2020-02-02 00:00:00")),

Review comment:
       Looks like at least in Spark 2.4, having Java 8 Instant as field 
directly won't work via Java type inference on Java beans.
   
   So if I understand correctly, we should either pass `long` or `Timestamp` to 
Spark on timestamp column, and TestSparkDataFile looks to leverage `long`. 
Would it be OK to change this to store long (using your suggestion to get 
microseconds, and probably convert to milliseconds due to Spark 2.4), and pass 
the long value to Spark?

##########
File path: 
spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitionPruning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false },
+        new Object[] { "orc", true },
+    };
+  }
+
+  private final String format;
+  private final boolean vectorized;
+
+  public TestPartitionPruning(String format, boolean vectorized) {
+    this.format = format;
+    this.vectorized = vectorized;
+  }
+
+  private static SparkSession spark = null;
+
+  private static Transform<Object, Integer> bucketTransform = 
Transforms.bucket(Types.IntegerType.get(), 3);
+  private static Transform<Object, Object> truncateTransform = 
Transforms.truncate(Types.StringType.get(), 5);
+  private static Transform<Object, Integer> hourTransform = 
Transforms.hour(Types.TimestampType.withZone());
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionPruning.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
+    String optionKey = String.format("fs.%s.impl", 
CountOpenLocalFileSystem.scheme);
+
+    CONF.set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.udf().register("bucket3", (Integer num) -> 
bucketTransform.apply(num), DataTypes.IntegerType);
+    spark.udf().register("truncate5", (String str) -> 
truncateTransform.apply(str), DataTypes.StringType);
+    // NOTE: date transforms take the type long, not Timestamp
+    spark.udf().register("hour", (Timestamp ts) -> hourTransform.apply(
+        (Long) 
org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(ts)),
+        DataTypes.IntegerType);
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestPartitionPruning.spark;
+    TestPartitionPruning.spark = null;
+    currentSpark.stop();
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get()),
+      Types.NestedField.optional(5, "timestamp", 
Types.TimestampType.withZone())
+  );
+
+  private static final List<LogMessage> LOGS = ImmutableList.of(
+      LogMessage.debug("2020-02-02", "debug event 1", 
Timestamp.valueOf("2020-02-02 00:00:00")),

Review comment:
       Just dealt with 7429dc0 - not sure I followed your suggestion properly.

##########
File path: 
spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitionPruning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet", false },
+        new Object[] { "parquet", true },
+        new Object[] { "avro", false },
+        new Object[] { "orc", false },
+        new Object[] { "orc", true },
+    };
+  }
+
+  private final String format;
+  private final boolean vectorized;
+
+  public TestPartitionPruning(String format, boolean vectorized) {
+    this.format = format;
+    this.vectorized = vectorized;
+  }
+
+  private static SparkSession spark = null;
+  private static JavaSparkContext sparkContext = null;
+
+  private static Transform<Object, Integer> bucketTransform = 
Transforms.bucket(Types.IntegerType.get(), 3);
+  private static Transform<Object, Object> truncateTransform = 
Transforms.truncate(Types.StringType.get(), 5);
+  private static Transform<Object, Integer> hourTransform = 
Transforms.hour(Types.TimestampType.withoutZone());
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionPruning.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
+    TestPartitionPruning.sparkContext = new 
JavaSparkContext(spark.sparkContext());
+
+    String optionKey = String.format("fs.%s.impl", 
CountOpenLocalFileSystem.scheme);
+    CONF.set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set(optionKey, CountOpenLocalFileSystem.class.getName());
+    spark.conf().set("spark.sql.session.timeZone", "UTC");
+    spark.udf().register("bucket3", (Integer num) -> 
bucketTransform.apply(num), DataTypes.IntegerType);
+    spark.udf().register("truncate5", (String str) -> 
truncateTransform.apply(str), DataTypes.StringType);
+    // NOTE: date transforms take the type long, not Timestamp
+    spark.udf().register("hour", (Timestamp ts) -> hourTransform.apply(
+        
org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(ts)),
+        DataTypes.IntegerType);
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestPartitionPruning.spark;
+    TestPartitionPruning.spark = null;
+    currentSpark.stop();
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+      Types.NestedField.optional(2, "date", Types.StringType.get()),
+      Types.NestedField.optional(3, "level", Types.StringType.get()),
+      Types.NestedField.optional(4, "message", Types.StringType.get()),
+      Types.NestedField.optional(5, "timestamp", 
Types.TimestampType.withZone())
+  );
+
+  private static final List<LogMessage> LOGS = ImmutableList.of(
+      LogMessage.debug("2020-02-02", "debug event 1", 
getInstant("2020-02-02T00:00:00")),
+      LogMessage.info("2020-02-02", "info event 1", 
getInstant("2020-02-02T01:00:00")),
+      LogMessage.debug("2020-02-02", "debug event 2", 
getInstant("2020-02-02T02:00:00")),
+      LogMessage.info("2020-02-03", "info event 2", 
getInstant("2020-02-03T00:00:00")),
+      LogMessage.debug("2020-02-03", "debug event 3", 
getInstant("2020-02-03T01:00:00")),
+      LogMessage.info("2020-02-03", "info event 3", 
getInstant("2020-02-03T02:00:00")),
+      LogMessage.error("2020-02-03", "error event 1", 
getInstant("2020-02-03T03:00:00")),
+      LogMessage.debug("2020-02-04", "debug event 4", 
getInstant("2020-02-04T01:00:00")),
+      LogMessage.warn("2020-02-04", "warn event 1", 
getInstant("2020-02-04T02:00:00")),
+      LogMessage.debug("2020-02-04", "debug event 5", 
getInstant("2020-02-04T03:00:00"))
+  );
+
+  private static Instant getInstant(String timestampWithoutZone) {
+    Long epochMicros = (Long) 
Literal.of(timestampWithoutZone).to(Types.TimestampType.withoutZone()).value();
+    return Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(epochMicros));
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private PartitionSpec spec = PartitionSpec.builderFor(LOG_SCHEMA)
+      .identity("date")
+      .identity("level")
+      .bucket("id", 3)
+      .truncate("message", 5)
+      .hour("timestamp")
+      .build();
+
+  private Random random = new Random();
+
+  @Test
+  public void testPartitionPruningIdentityString() {
+    String filterCond = "date >= '2020-02-03' AND level = 'DEBUG'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String date = r.getString(0);
+      String level = r.getString(1);
+      return date.compareTo("2020-02-03") >= 0 && level.equals("DEBUG");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningBucketingInteger() {
+    final int[] ids = new int[]{
+        LOGS.get(3).getId(),
+        LOGS.get(7).getId()
+    };
+    String condForIds = Arrays.stream(ids).mapToObj(String::valueOf)
+        .collect(Collectors.joining(",", "(", ")"));
+    String filterCond = "id in " + condForIds;
+    Predicate<Row> partCondition = (Row r) -> {
+      int bucketId = r.getInt(2);
+      Set<Integer> buckets = Arrays.stream(ids).map(bucketTransform::apply)
+          .boxed().collect(Collectors.toSet());
+      return buckets.contains(bucketId);
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningTruncatedString() {
+    String filterCond = "message like 'info event%'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String truncatedMessage = r.getString(3);
+      return truncatedMessage.equals("info ");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void 
testPartitionPruningTruncatedStringComparingValueShorterThanPartitionValue() {
+    String filterCond = "message like 'inf%'";
+    Predicate<Row> partCondition = (Row r) -> {
+      String truncatedMessage = r.getString(3);
+      return truncatedMessage.startsWith("inf");
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  @Test
+  public void testPartitionPruningHourlyPartition() {
+    String filterCond;
+    if (spark.version().startsWith("2")) {
+      // Looks like from Spark 2 we need to compare timestamp with timestamp 
to push down the filter.
+      filterCond = "timestamp >= to_timestamp('2020-02-03T01:00:00')";
+    } else {
+      filterCond = "timestamp >= '2020-02-03T01:00:00'";
+    }
+    Predicate<Row> partCondition = (Row r) -> {
+      int hourValue = r.getInt(4);
+      Instant instant = getInstant("2020-02-03T01:00:00");
+      Integer hourValueToFilter = 
hourTransform.apply(TimeUnit.MILLISECONDS.toMicros(instant.toEpochMilli()));
+      return hourValue >= hourValueToFilter;
+    };
+
+    runTest(filterCond, partCondition);
+  }
+
+  private void runTest(String filterCond, Predicate<Row> partCondition) {
+    File originTableLocation = createTempDir();
+    Assert.assertTrue("Temp folder should exist", 
originTableLocation.exists());
+
+    Table table = createTable(originTableLocation);
+    Dataset<Row> logs = createTestDataset();
+    saveTestDatasetToTable(logs, table);
+
+    List<Row> expected = logs
+        .select("id", "date", "level", "message", "timestamp")
+        .filter(filterCond)
+        .orderBy("id")
+        .collectAsList();
+    Assert.assertFalse("Expected rows should be not empty", 
expected.isEmpty());
+
+    // remove records which may be recorded during storing to table
+    
CountOpenLocalFileSystem.resetRecordsInPathPrefix(originTableLocation.getAbsolutePath());
+
+    List<Row> actual = spark.read()
+        .format("iceberg")
+        .option("vectorization-enabled", String.valueOf(vectorized))
+        .load(table.location())
+        .select("id", "date", "level", "message", "timestamp")
+        .filter(filterCond)
+        .orderBy("id")
+        .collectAsList();
+    Assert.assertFalse("Actual rows should not be empty", actual.isEmpty());
+
+    Assert.assertEquals("Rows should match", expected, actual);
+
+    assertAccessOnDataFiles(originTableLocation, table, partCondition);
+  }
+
+  private File createTempDir() {
+    try {
+      int rand = random.nextInt(1000000);
+      return temp.newFolder(String.format("logs-%d", rand));

Review comment:
       I didn't realize `temp.newFolder()` would just work for creating unique 
dir. My bad.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to