Re: [Spark SQL] Does Spark group small files
Yes, it does bin-packing for small files which is a good thing so you avoid having many small partitions especially if you’re writing this data back out (e.g. it’s compacting as you read). The default partition size is 128MB with a 4MB “cost” for opening files. You can configure this using the settings defined here: http://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options From: Yann Moisan Date: Tuesday, November 13, 2018 at 3:28 PM To: "user@spark.apache.org" Subject: [Spark SQL] Does Spark group small files Hello, I'm using Spark 2.3.1. I have a job that reads 5.000 small parquet files into s3. When I do a mapPartitions followed by a collect, only 278 tasks are used (I would have expected 5000). Does Spark group small files ? If yes, what is the threshold for grouping ? Is it configurable ? Any link to corresponding source code ? Rgds, Yann.
[ANNOUNCE] Apache Toree 0.3.0-incubating Released
Apache Toree is a kernel for the Jupyter Notebook platform providing interactive and remote access to Apache Spark. The Apache Toree community is pleased to announce the release of Apache Toree 0.3.0-incubating which provides various bug fixes and the following enhancements. * Fix JupyterLab support after the introduction of new cell metadata information * Support for high-order functions * Fix %Showtypes and %Truncate magics * Added %ShowOutput magic to disable console output * Added support for custom resolvers for %AddDeps magic * Added support for predefined variables in SQL Statements * Removed support for PySpark and Spark R in Toree (use specific kernels) For more information about Apache Toree and go download the latest release go to: https://toree.incubator.apache.org/ For more information on how to use Apache Toree please visit our documentation page: https://toree.incubator.apache.org/docs/current/user/quick-start/ -- Luciano Resende http://people.apache.org/~lresende http://twitter.com/lresende1975 http://lresende.blogspot.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[ANNOUNCE] Apache Bahir 2.2.2 Released
Apache Bahir provides extensions to multiple distributed analytic platforms, extending their reach with a diversity of streaming connectors and SQL data sources. The Apache Bahir community is pleased to announce the release of Apache Bahir 2.2.2 which provides the following extensions for Apache Spark 2.2.2: - Apache CouchDB/Cloudant SQL data source - Apache CouchDB/Cloudant Streaming - Akka Streaming - Akka Structured Streaming - Google Cloud Pub/Sub Streaming connector - MQTT Streaming - MQTT Structured Streaming - Twitter Streaming - ZeroMQ Streaming For more information about Apache Bahir and to download the latest release go to: https://bahir.apache.org For more details on how to use Apache Bahir extensions in your application please visit our documentation page https://bahir.apache.org/docs/spark/2.2.2/documentation/ The Apache Bahir PMC -- Luciano Resende http://people.apache.org/~lresende http://twitter.com/lresende1975 http://lresende.blogspot.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[ANNOUNCE] Apache Bahir 2.1.3 Released
Apache Bahir provides extensions to multiple distributed analytic platforms, extending their reach with a diversity of streaming connectors and SQL data sources. The Apache Bahir community is pleased to announce the release of Apache Bahir 2.1.3 which provides the following extensions for Apache Spark 2.1.3: - Apache CouchDB/Cloudant SQL data source - Apache CouchDB/Cloudant Streaming - Akka Streaming - Akka Structured Streaming - Google Cloud Pub/Sub Streaming connector - MQTT Streaming - MQTT Structured Streaming - Twitter Streaming - ZeroMQ Streaming For more information about Apache Bahir and to download the latest release go to: https://bahir.apache.org For more details on how to use Apache Bahir extensions in your application please visit our documentation page https://bahir.apache.org/docs/spark/2.1.3/documentation/ The Apache Bahir PMC -- Luciano Resende http://people.apache.org/~lresende http://twitter.com/lresende1975 http://lresende.blogspot.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
inferred schemas for spark streaming from a Kafka source
Does anybody know how to use inferred schemas with structured streaming: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#schema-inference-and-partition-of-streaming-dataframesdatasets I have some code like : object StreamingApp { def launch(config: Config, spark: SparkSession): Unit = { import spark.implicits._ val schemaJson = spark.sparkContext.parallelize(List(config.schema)) val schemaDF = spark.read.json(schemaJson) schemaDF.printSchema() // read text from kafka val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers",config.broker) .option("subscribe",config.topic) .option("startingOffsets", "earliest") .load() spark.sql("set spark.sql.streaming.schemaInference=true") val jsonOptions = Map[String,String]("mode" -> "FAILFAST") val org_store_event_df = df.select( col("key").cast("string"), from_json(col("value").cast("string"), schemaDF.schema, jsonOptions)).writeStream .format("console") .start() .awaitTermination() } } I'd like to compare an inferred schema against my provided, to determine what I'm missing from my provided scheme or why I arrive with all nulls in my values column. currently I'm using a schema to read from a json file. But I'd like to infer the schema from the stream as suggested by the docs. Then not sure how to replace from_json so that the value column is read using an inferred schema, or otherwise. Maybe it's not supported for kafka streams and only for file streams? If this is the case then why the have different implementations? Also shouldn't we make the documentation more clear? - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[Spark SQL] Does Spark group small files
Hello, I'm using Spark 2.3.1. I have a job that reads 5.000 small parquet files into s3. When I do a mapPartitions followed by a collect, only *278* tasks are used (I would have expected 5000). Does Spark group small files ? If yes, what is the threshold for grouping ? Is it configurable ? Any link to corresponding source code ? Rgds, Yann.
Failed to convert java.sql.Date to String
Hi, All, I'm new to Spark SQL and just start to use it in our project. We are using spark 2. When importing data from a Hive table, I got the following error: if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, processing_dttm), StringType), true) AS processing_dttm#91 at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290) at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573) at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190) at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108) at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: java.sql.Date is not a valid external type for schema of string at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287) ... 21 more This is related to following line in our code (actually it's the codes from the third party): if (dataType != null && dataType.isDateOrTimestamp()) { field = new StructField(field.name(), DataTypes.StringType, field.nullable(), field.metadata()); } Does anyone know why and what kind of types that can be converted to stirng? Thanks Boying 本邮件内容包含保密信息。如阁下并非拟发送的收件人,请您不要阅读、保存、对外 披露或复制本邮件的任何内容,或者打开本邮件的任何附件。请即回复邮件告知发件 人,并立刻将该邮件及其附件从您的电脑系统中全部删除,不胜感激。 This email message may contain confidential and/or privileged information. If you are not the intended recipient, please do not read, save, forward, disclose or copy the contents of this email or open any file attached to this email. We will be grateful if you could advise the sender immediately by replying this email, and delete this email and any attachment or links to this email completely and immediately from your computer system.