Re: [Spark SQL] Does Spark group small files

2018-11-13 Thread Silvio Fiorito
Yes, it does bin-packing for small files which is a good thing so you avoid 
having many small partitions especially if you’re writing this data back out 
(e.g. it’s compacting as you read). The default partition size is 128MB with a 
4MB “cost” for opening files. You can configure this using the settings defined 
here: 
http://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options

From: Yann Moisan 
Date: Tuesday, November 13, 2018 at 3:28 PM
To: "user@spark.apache.org" 
Subject: [Spark SQL] Does Spark group small files

Hello,

I'm using Spark 2.3.1.

I have a job that reads 5.000 small parquet files into s3.

When I do a mapPartitions followed by a collect, only 278 tasks are used (I 
would have expected 5000). Does Spark group small files ? If yes, what is the 
threshold for grouping ? Is it configurable ? Any link to corresponding source 
code ?

Rgds,

Yann.


[ANNOUNCE] Apache Toree 0.3.0-incubating Released

2018-11-13 Thread Luciano Resende
Apache Toree is a kernel for the Jupyter Notebook platform providing
interactive and remote access to Apache Spark.

The Apache Toree community is pleased to announce the release of
Apache Toree 0.3.0-incubating which provides various bug fixes and the
following enhancements.

   * Fix JupyterLab support after the introduction of new cell
metadata information
   * Support for high-order functions
   * Fix %Showtypes and %Truncate magics
   * Added %ShowOutput magic to disable console output
   * Added support for custom resolvers for %AddDeps magic
   * Added support for predefined variables in SQL Statements
   * Removed support for PySpark and Spark R in Toree (use specific kernels)

For more information about Apache Toree and go download the latest
release go to:

   https://toree.incubator.apache.org/

For more information on how to use Apache Toree please visit our
documentation page:

   https://toree.incubator.apache.org/docs/current/user/quick-start/

-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[ANNOUNCE] Apache Bahir 2.2.2 Released

2018-11-13 Thread Luciano Resende
Apache Bahir provides extensions to multiple distributed analytic
platforms, extending their reach with a diversity of streaming
connectors and SQL data sources.

The Apache Bahir community is pleased to announce the release of
Apache Bahir 2.2.2 which provides the following extensions for Apache
Spark 2.2.2:

   - Apache CouchDB/Cloudant SQL data source
   - Apache CouchDB/Cloudant Streaming
   - Akka Streaming
   - Akka Structured Streaming
   - Google Cloud Pub/Sub Streaming connector
   - MQTT Streaming
   - MQTT Structured Streaming
   - Twitter Streaming
   - ZeroMQ Streaming

For more information about Apache Bahir and to download the
latest release go to:

https://bahir.apache.org

For more details on how to use Apache Bahir extensions in your
application please visit our documentation page

   https://bahir.apache.org/docs/spark/2.2.2/documentation/

The Apache Bahir PMC

-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[ANNOUNCE] Apache Bahir 2.1.3 Released

2018-11-13 Thread Luciano Resende
Apache Bahir provides extensions to multiple distributed analytic
platforms, extending their reach with a diversity of streaming
connectors and SQL data sources.

The Apache Bahir community is pleased to announce the release of
Apache Bahir 2.1.3 which provides the following extensions for Apache
Spark 2.1.3:

   - Apache CouchDB/Cloudant SQL data source
   - Apache CouchDB/Cloudant Streaming
   - Akka Streaming
   - Akka Structured Streaming
   - Google Cloud Pub/Sub Streaming connector
   - MQTT Streaming
   - MQTT Structured Streaming
   - Twitter Streaming
   - ZeroMQ Streaming

For more information about Apache Bahir and to download the
latest release go to:

https://bahir.apache.org

For more details on how to use Apache Bahir extensions in your
application please visit our documentation page

   https://bahir.apache.org/docs/spark/2.1.3/documentation/

The Apache Bahir PMC

-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



inferred schemas for spark streaming from a Kafka source

2018-11-13 Thread Colin Williams
Does anybody know how to use inferred schemas with structured
streaming: 
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#schema-inference-and-partition-of-streaming-dataframesdatasets

I have some code like :

object StreamingApp {

  def launch(config: Config, spark: SparkSession): Unit = {
import spark.implicits._


val schemaJson = spark.sparkContext.parallelize(List(config.schema))
val schemaDF = spark.read.json(schemaJson)
schemaDF.printSchema()

// read text from kafka
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers",config.broker)
  .option("subscribe",config.topic)
  .option("startingOffsets", "earliest")
  .load()

spark.sql("set spark.sql.streaming.schemaInference=true")

val jsonOptions = Map[String,String]("mode" -> "FAILFAST")

val org_store_event_df = df.select(
  col("key").cast("string"),
  from_json(col("value").cast("string"), schemaDF.schema,
jsonOptions)).writeStream
  .format("console")
  .start()
  .awaitTermination()
  }
}

I'd like to compare an inferred schema against my provided, to
determine what I'm missing from my provided scheme or why I arrive
with all nulls in my values column.

currently I'm using a schema to read from a json file. But I'd like to
infer the schema from the stream as suggested by the docs. Then not
sure how to replace from_json so that the value column is read using
an inferred schema, or otherwise.

Maybe it's not supported for kafka streams and only for file streams?
If this is the case then why the have different implementations?

Also shouldn't we make the documentation more clear?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark SQL] Does Spark group small files

2018-11-13 Thread Yann Moisan
Hello,

I'm using Spark 2.3.1.

I have a job that reads 5.000 small parquet files into s3.

When I do a mapPartitions followed by a collect, only *278* tasks are used
(I would have expected 5000). Does Spark group small files ? If yes, what
is the threshold for grouping ? Is it configurable ? Any link to
corresponding source code ?

Rgds,

Yann.


Failed to convert java.sql.Date to String

2018-11-13 Thread luby
Hi, All,

I'm new to Spark SQL and just start to use it in our project. We are using 
spark 2.

When importing data from a Hive table, I got the following error:

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null 
else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 3, processing_dttm), StringType), true) 
AS processing_dttm#91
at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
at 
org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
at 
org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 
Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at 
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at 
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.sql.Date is not a valid 
external type for schema of string
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr1$(Unknown
 
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown
 
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 
Source)
at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
... 21 more

This is related to following line in our code (actually it's the codes 
from the third party):
if (dataType != null && dataType.isDateOrTimestamp()) {
field = new StructField(field.name(), DataTypes.StringType, 
field.nullable(), field.metadata());
 }

Does anyone know why and what kind of types that can be converted to 
stirng?

Thanks 

Boying



 
本邮件内容包含保密信息。如阁下并非拟发送的收件人,请您不要阅读、保存、对外
披露或复制本邮件的任何内容,或者打开本邮件的任何附件。请即回复邮件告知发件
人,并立刻将该邮件及其附件从您的电脑系统中全部删除,不胜感激。

 
This email message may contain confidential and/or privileged information. 
If you are not the intended recipient, please do not read, save, forward, 
disclose or copy the contents of this email or open any file attached to 
this email. We will be grateful if you could advise the sender immediately 
by replying this email, and delete this email and any attachment or links 
to this email completely and immediately from your computer system.