Hi all I feel like this questions is more Spark dev related that Spark user related. Please correct me if I'm wrong.
My project's data flow involves sampling records from the data stored as Parquet dataset. I've checked DataFrames API and it doesn't support user defined predicates projection pushdown - only simple filter expressions. I want to use custom filter function predicate pushdown feature of parquet while loading data with newAPIHadoopFile. Simple filters constructed with org.apache.parquet.filter2 API works fine. But User Defined Predicate works only with `--master local` mode. When I try to run in yarn-client mode my test program that uses UDP class to be used by parquet-mr I'm getting class not found exception. I suspect that the issue could be related to the way how class loader works from parquet or maybe it could be related to the fact that Spark executor processes has my jar loaded from HTTP server and there is some security policies (classpath shows that the jar URI is actually HTTP URL and not local file). I've tried to create uber jar with all dependencies and shipt it with the spark app - no success. PS I'm using spark 1.5.1. Here is my command line I'm using to submit the application: SPARK_CLASSPATH=./lib/my-jar-with-dependencies.jar spark-submit \ --master yarn-client --num-executors 3 --driver-memory 3G --executor-memory 2G \ --executor-cores 1 \ --jars ./lib/my-jar-with-dependencies.jar,./lib/snappy-java-1.1.2.jar,./lib/parquet-hadoop-1.7.0.jar,./lib/parquet-avro-1.7.0.jar,./lib/parquet-column-1.7.0.jar,/opt/cloudera/parcels/CDH/jars/avro-1.7.6-cdh5.4.0.jar,/opt/cloudera/parcels/CDH/jars/avro-mapred-1.7.6-cdh5.4.0-hadoop2.jar, \ --class my.app.parquet.filters.tools.TestSparkApp \ ./lib/my-jar-with-dependencies.jar \ yarn-client \ "/user/vvlad/2015/*/*/*/EVENTS" Here is the code of my UDP class: package my.app.parquet.filters.udp import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate import java.lang.{Integer => JInt} import scala.util.Random class SampleIntColumn(threshold: Double) extends UserDefinedPredicate[JInt] with Serializable { lazy val random = { new Random() } val myThreshold = threshold override def keep(value: JInt): Boolean = { random.nextFloat() < myThreshold } override def canDrop(statistics: Statistics[JInt]): Boolean = false override def inverseCanDrop(statistics: Statistics[JInt]): Boolean = false override def toString: String = { "%s(%f)".format(getClass.getName, myThreshold) } } Spark app: package my.app.parquet.filters.tools import my.app.parquet.filters.udp.SampleIntColumn import org.apache.avro.generic.GenericRecord import org.apache.hadoop.mapreduce.Job import org.apache.parquet.avro.AvroReadSupport import org.apache.parquet.filter2.dsl.Dsl.IntColumn import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.spark.{SparkContext, SparkConf} import org.apache.parquet.filter2.dsl.Dsl._ import org.apache.parquet.filter2.predicate.FilterPredicate object TestSparkApp { def main (args: Array[String]) { val conf = new SparkConf() //"local[2]" or yarn-client etc .setMaster(args(0)) .setAppName("Spark Scala App") .set("spark.executor.memory", "1g") .set("spark.rdd.compress", "true") .set("spark.storage.memoryFraction", "1") val sc = new SparkContext(conf) val job = new Job(sc.hadoopConfiguration) ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[GenericRecord]]) val sampler = new SampleIntColumn(0.05) val impField = IntColumn("impression") val pred: FilterPredicate = impField.filterBy(sampler) ParquetInputFormat.setFilterPredicate(job.getConfiguration, pred) println(job.getConfiguration.get("parquet.private.read.filter.predicate")) println(job.getConfiguration.get("parquet.private.read.filter.predicate.human.readable")) val records1 = sc.newAPIHadoopFile( //<path to parquet> args(1), classOf[ParquetInputFormat[GenericRecord]], classOf[Void], classOf[GenericRecord], job.getConfiguration ).map(_._2).cache() println("result count " + records1.count().toString) sc.stop() } } Here are logs with exception I'm getting: 15/10/19 11:14:43 INFO TaskSetManager: Starting task 21.0 in stage 0.0 (TID 0, hdp010........, NODE_LOCAL, 2815 bytes) 15/10/19 11:14:43 INFO TaskSetManager: Starting task 14.0 in stage 0.0 (TID 1, hdp042........, NODE_LOCAL, 2816 bytes) 15/10/19 11:14:43 INFO YarnClientSchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@hdp027........:43593/user/Executor#-832887318]) with ID 3 15/10/19 11:14:43 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 2, hdp027........, NODE_LOCAL, 2814 bytes) 15/10/19 11:14:44 INFO BlockManagerMasterEndpoint: Registering block manager hdp027........:64266 with 883.8 MB RAM, BlockManagerId(3, hdp027........, 64266) 15/10/19 11:14:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on hdp010........:23967 (size: 1516.0 B, free: 883.8 MB) 15/10/19 11:14:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on hdp042........:63034 (size: 1516.0 B, free: 883.8 MB) 15/10/19 11:14:45 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on hdp010........:23967 (size: 25.1 KB, free: 883.8 MB) 15/10/19 11:14:45 INFO TaskSetManager: Starting task 48.0 in stage 0.0 (TID 3, hdp010........, NODE_LOCAL, 2816 bytes) 15/10/19 11:14:45 WARN TaskSetManager: Lost task 21.0 in stage 0.0 (TID 0, hdp010........): java.lang.RuntimeException: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate at org.apache.parquet.hadoop.ParquetInputFormat.getFilterPredicate(ParquetInputFormat.java:196) at org.apache.parquet.hadoop.ParquetInputFormat.getFilter(ParquetInputFormat.java:205) at org.apache.parquet.hadoop.ParquetInputFormat.createRecordReader(ParquetInputFormat.java:241) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:151) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:124) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate at org.apache.parquet.hadoop.util.SerializationUtil.readObjectFromConfAsBase64(SerializationUtil.java:102) at org.apache.parquet.hadoop.ParquetInputFormat.getFilterPredicate(ParquetInputFormat.java:194) ... 17 more Caused by: java.lang.ClassNotFoundException: my.app.parquet.filters.udp.SampleIntColumn at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.parquet.hadoop.util.SerializationUtil.readObjectFromConfAsBase64(SerializationUtil.java:100) ... 18 more Best Regards Vladimir Vladimirov