How to know that a partition is ready when using Structured Streaming

2019-01-16 Thread Wayne Guo
When using structured streaming, we use "partitionBy" api  to partition the
output data, and use the watermark based on event-time to handle delay
records, but how to tell downstream users  that a partition is ready? For
example, when to write an empty "hadoop.done" file in a paritition
directory?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Subscribe

2019-01-16 Thread Vasu Devan



Re: How to force-quit a Spark application?

2019-01-16 Thread Marcelo Vanzin
Those are daemon threads and not the cause of the problem. The main
thread is waiting for the "org.apache.hadoop.util.ShutdownHookManager"
thread, but I don't see that one in your list.

On Wed, Jan 16, 2019 at 12:08 PM Pola Yao  wrote:
>
> Hi Marcelo,
>
> Thanks for your response.
>
> I have dumped the threads on the server where I submitted the spark 
> application:
>
> '''
> ...
> "dispatcher-event-loop-2" #28 daemon prio=5 os_prio=0 tid=0x7f56cee0e000 
> nid=0x1cb6 waiting on condition [0x7f5699811000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0006400161b8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "dispatcher-event-loop-1" #27 daemon prio=5 os_prio=0 tid=0x7f56cee0c800 
> nid=0x1cb5 waiting on condition [0x7f5699912000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0006400161b8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "dispatcher-event-loop-0" #26 daemon prio=5 os_prio=0 tid=0x7f56cee0c000 
> nid=0x1cb4 waiting on condition [0x7f569a12]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0006400161b8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> "Service Thread" #20 daemon prio=9 os_prio=0 tid=0x7f56cc12d800 
> nid=0x1ca5 runnable [0x]
>java.lang.Thread.State: RUNNABLE
>
> "C1 CompilerThread14" #19 daemon prio=9 os_prio=0 tid=0x7f56cc12a000 
> nid=0x1ca4 waiting on condition [0x]
>java.lang.Thread.State: RUNNABLE
> ...
> "Finalizer" #3 daemon prio=8 os_prio=0 tid=0x7f56cc0ce000 nid=0x1c93 in 
> Object.wait() [0x7f56ab3f2000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x0006400cd498> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
>
> "Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x7f56cc0c9800 
> nid=0x1c92 in Object.wait() [0x7f55cfffe000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
> - locked <0x0006400a2660> (a java.lang.ref.Reference$Lock)
> at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
>
> "main" #1 prio=5 os_prio=0 tid=0x7f56cc021000 nid=0x1c74 in Object.wait() 
> [0x7f56d344c000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Thread.join(Thread.java:1249)
> - locked <0x00064056f6a0> (a org.apache.hadoop.util.ShutdownHookManager$1)
> at java.lang.Thread.join(Thread.java:1323)
> at 
> java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
> at java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
> at 

Re: How to force-quit a Spark application?

2019-01-16 Thread Pola Yao
Hi Marcelo,

Thanks for your response.

I have dumped the threads on the server where I submitted the spark
application:

'''
...
"dispatcher-event-loop-2" #28 daemon prio=5 os_prio=0
tid=0x7f56cee0e000 nid=0x1cb6 waiting on condition [0x7f5699811000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0006400161b8> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

"dispatcher-event-loop-1" #27 daemon prio=5 os_prio=0
tid=0x7f56cee0c800 nid=0x1cb5 waiting on condition [0x7f5699912000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0006400161b8> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

"dispatcher-event-loop-0" #26 daemon prio=5 os_prio=0
tid=0x7f56cee0c000 nid=0x1cb4 waiting on condition [0x7f569a12]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0006400161b8> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

"Service Thread" #20 daemon prio=9 os_prio=0 tid=0x7f56cc12d800
nid=0x1ca5 runnable [0x]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread14" #19 daemon prio=9 os_prio=0 tid=0x7f56cc12a000
nid=0x1ca4 waiting on condition [0x]
   java.lang.Thread.State: RUNNABLE
...
"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x7f56cc0ce000 nid=0x1c93 in
Object.wait() [0x7f56ab3f2000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x0006400cd498> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x7f56cc0c9800
nid=0x1c92 in Object.wait() [0x7f55cfffe000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x0006400a2660> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=0 tid=0x7f56cc021000 nid=0x1c74 in
Object.wait() [0x7f56d344c000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1249)
- locked <0x00064056f6a0> (a
org.apache.hadoop.util.ShutdownHookManager$1)
at java.lang.Thread.join(Thread.java:1323)
at
java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
at
java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
at java.lang.Shutdown.runHooks(Shutdown.java:123)
at java.lang.Shutdown.sequence(Shutdown.java:167)
at java.lang.Shutdown.exit(Shutdown.java:212)
- locked <0x0006404e65b8> (a java.lang.Class for java.lang.Shutdown)
at java.lang.Runtime.exit(Runtime.java:109)
at java.lang.System.exit(System.java:971)
at scala.sys.package$.exit(package.scala:40)
at scala.sys.package$.exit(package.scala:33)
at

Re: How to force-quit a Spark application?

2019-01-16 Thread Marcelo Vanzin
If System.exit() doesn't work, you may have a bigger problem
somewhere. Check your threads (using e.g. jstack) to see what's going
on.

On Wed, Jan 16, 2019 at 8:09 AM Pola Yao  wrote:
>
> Hi Marcelo,
>
> Thanks for your reply! It made sense to me. However, I've tried many ways to 
> exit the spark (e.g., System.exit()), but failed. Is there an explicit way to 
> shutdown all the alive threads in the spark application and then quit 
> afterwards?
>
>
> On Tue, Jan 15, 2019 at 2:38 PM Marcelo Vanzin  wrote:
>>
>> You should check the active threads in your app. Since your pool uses
>> non-daemon threads, that will prevent the app from exiting.
>>
>> spark.stop() should have stopped the Spark jobs in other threads, at
>> least. But if something is blocking one of those threads, or if
>> something is creating a non-daemon thread that stays alive somewhere,
>> you'll see that.
>>
>> Or you can force quit with sys.exit.
>>
>> On Tue, Jan 15, 2019 at 1:30 PM Pola Yao  wrote:
>> >
>> > I submitted a Spark job through ./spark-submit command, the code was 
>> > executed successfully, however, the application got stuck when trying to 
>> > quit spark.
>> >
>> > My code snippet:
>> > '''
>> > {
>> >
>> > val spark = SparkSession.builder.master(...).getOrCreate
>> >
>> > val pool = Executors.newFixedThreadPool(3)
>> > implicit val xc = ExecutionContext.fromExecutorService(pool)
>> > val taskList = List(train1, train2, train3)  // where train* is a Future 
>> > function which wrapped up some data reading and feature engineering and 
>> > machine learning steps
>> > val results = Await.result(Future.sequence(taskList), 20 minutes)
>> >
>> > println("Shutting down pool and executor service")
>> > pool.shutdown()
>> > xc.shutdown()
>> >
>> > println("Exiting spark")
>> > spark.stop()
>> >
>> > }
>> > '''
>> >
>> > After I submitted the job, from terminal, I could see the code was 
>> > executed and printing "Exiting spark", however, after printing that line, 
>> > it never existed spark, just got stuck.
>> >
>> > Does any body know what the reason is? Or how to force quitting?
>> >
>> > Thanks!
>> >
>> >
>>
>>
>> --
>> Marcelo



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to force-quit a Spark application?

2019-01-16 Thread Pola Yao
Hi Marcelo,

Thanks for your reply! It made sense to me. However, I've tried many ways
to exit the spark (e.g., System.exit()), but failed. Is there an explicit
way to shutdown all the alive threads in the spark application and then
quit afterwards?


On Tue, Jan 15, 2019 at 2:38 PM Marcelo Vanzin  wrote:

> You should check the active threads in your app. Since your pool uses
> non-daemon threads, that will prevent the app from exiting.
>
> spark.stop() should have stopped the Spark jobs in other threads, at
> least. But if something is blocking one of those threads, or if
> something is creating a non-daemon thread that stays alive somewhere,
> you'll see that.
>
> Or you can force quit with sys.exit.
>
> On Tue, Jan 15, 2019 at 1:30 PM Pola Yao  wrote:
> >
> > I submitted a Spark job through ./spark-submit command, the code was
> executed successfully, however, the application got stuck when trying to
> quit spark.
> >
> > My code snippet:
> > '''
> > {
> >
> > val spark = SparkSession.builder.master(...).getOrCreate
> >
> > val pool = Executors.newFixedThreadPool(3)
> > implicit val xc = ExecutionContext.fromExecutorService(pool)
> > val taskList = List(train1, train2, train3)  // where train* is a Future
> function which wrapped up some data reading and feature engineering and
> machine learning steps
> > val results = Await.result(Future.sequence(taskList), 20 minutes)
> >
> > println("Shutting down pool and executor service")
> > pool.shutdown()
> > xc.shutdown()
> >
> > println("Exiting spark")
> > spark.stop()
> >
> > }
> > '''
> >
> > After I submitted the job, from terminal, I could see the code was
> executed and printing "Exiting spark", however, after printing that line,
> it never existed spark, just got stuck.
> >
> > Does any body know what the reason is? Or how to force quitting?
> >
> > Thanks!
> >
> >
>
>
> --
> Marcelo
>


Re: How to unsubscribe???

2019-01-16 Thread Trevor News
Hi Junior,
After you send an email to user-unsubscr...@spark.apache.org, you should 
receive a email with Instructions to double confirm. You will be asked to send 
another email using the link in the 2nd email. Only when that step is complete 
will the unsubscribe take effect. 

Please check your Spam or Junk or Clutter folders. Unless we confirm, we will 
continue the subscription. 

Hope that helps. 

Trevor

Sent from my iPhone

> On Jan 16, 2019, at 12:22 AM, Junior Alvarez  
> wrote:
> 
> Hi!
>  
> I’ve been sending an unsubscribe mail, to this address: 
> user-unsubscr...@spark.apache.org, for the last months, and still…I don’t 
> manage to unsubscribe…Why???
>  
> B r
> /Junior


Re: [ANNOUNCE] Announcing Apache Spark 2.2.3

2019-01-16 Thread Takeshi Yamamuro
Thanks, Dongjoon!


On Wed, Jan 16, 2019 at 5:23 PM Hyukjin Kwon  wrote:

> Nice!
>
> 2019년 1월 16일 (수) 오전 11:55, Jiaan Geng 님이 작성:
>
>> Glad to hear this.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

-- 
---
Takeshi Yamamuro


Re: cache table vs. parquet table performance

2019-01-16 Thread Jörn Franke
I believe the in-memory solution misses the storage indexes that parquet / orc 
have.

The in-memory solution is more suitable if you iterate in the whole set of data 
frequently.

> Am 15.01.2019 um 19:20 schrieb Tomas Bartalos :
> 
> Hello,
> 
> I'm using spark-thrift server and I'm searching for best performing solution 
> to query hot set of data. I'm processing records with nested structure, 
> containing subtypes and arrays. 1 record takes up several KB.
> 
> I tried to make some improvement with cache table:
> cache table event_jan_01 as select * from events where day_registered = 
> 20190102;
> 
> If I understood correctly, the data should be stored in in-memory columnar 
> format with storage level MEMORY_AND_DISK. So data which doesn't fit to 
> memory will be spille to disk (I assume also in columnar format (?))
> I cached 1 day of data (1 M records) and according to spark UI storage tab 
> none of the data was cached to memory and everything was spilled to disk. The 
> size of the data was 5.7 GB.
> Typical queries took ~ 20 sec.
> 
> Then I tried to store the data to parquet format:
> CREATE TABLE event_jan_01_par USING parquet location "/tmp/events/jan/02" as 
> select * from event_jan_01;
> 
> The whole parquet took up only 178MB.
> And typical queries took 5-10 sec.
> 
> Is it possible to tune spark to spill the cached data in parquet format ?
> Why the whole cached table was spilled to disk and nothing stayed in memory ?
> 
> Spark version: 2.4.0
> 
> Best regards,
> Tomas
> 


Re: cache table vs. parquet table performance

2019-01-16 Thread Todd Nist
Hi Tomas,

Have you considered using something like https://www.alluxio.org/ for you
cache?  Seems like a possible solution for what your trying to do.

-Todd

On Tue, Jan 15, 2019 at 11:24 PM 大啊  wrote:

> Hi ,Tomas.
> Thanks for your question give me some prompt.But the best way use cache
> usually stores smaller data.
> I think cache large data will consume memory or disk space too much.
> Spill the cached data in parquet format maybe a good improvement.
>
> At 2019-01-16 02:20:56, "Tomas Bartalos"  wrote:
>
> Hello,
>
> I'm using spark-thrift server and I'm searching for best performing
> solution to query hot set of data. I'm processing records with nested
> structure, containing subtypes and arrays. 1 record takes up several KB.
>
> I tried to make some improvement with cache table:
>
> cache table event_jan_01 as select * from events where day_registered =
> 20190102;
>
>
> If I understood correctly, the data should be stored in *in-memory
> columnar* format with storage level MEMORY_AND_DISK. So data which
> doesn't fit to memory will be spille to disk (I assume also in columnar
> format (?))
> I cached 1 day of data (1 M records) and according to spark UI storage tab
> none of the data was cached to memory and everything was spilled to disk.
> The size of the data was *5.7 GB.*
> Typical queries took ~ 20 sec.
>
> Then I tried to store the data to parquet format:
>
> CREATE TABLE event_jan_01_par USING parquet location "/tmp/events/jan/02"
> as
>
> select * from event_jan_01;
>
>
> The whole parquet took up only *178MB.*
> And typical queries took 5-10 sec.
>
> Is it possible to tune spark to spill the cached data in parquet format ?
> Why the whole cached table was spilled to disk and nothing stayed in
> memory ?
>
> Spark version: 2.4.0
>
> Best regards,
> Tomas
>
>
>
>
>


[Spark SQL]: how is “Exchange hashpartitioning” working in spark

2019-01-16 Thread nkx
Hi,

I have a dataset which I want to write sorted into parquet files for getting
benefit of requesting these files afterwards over Spark including Predicate
Pushdown.

Currently I used repartition by column and the number of partitions to move
the data to the particular partition. The column is identifying the
corresponding partition (beginning from 0 to (fixed) n). The result is that
scala/spark is generating an unexpected result and creating less partitions
(some of them are empty). Maybe a Hash Collision? 


For solving the problem I tried to find out the reason and tried to find
workarounds. I found one workaround by transforming the dataframe to rdd and 
use partitionBy with HashPartitioner. Surprising for me: I got the expected
results. But converting a dataframe to an RDD is not a solution for me,
because it takes too much resources.

I have tested this environment on 
- SPARK 2.0 on cloudera CDH 5.9.3
- SPARK 2.3.1 on emr-5.17.0


Here is my tests with outputs. Please use Spark-shell to run them


scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner

scala> val mydataindex = Array(0,1, 2, 3,4)
mydataindex: Array[Int] = Array(0, 1, 2, 3, 4)

scala> val mydata = sc.parallelize(for {
 |  x <- mydataindex
 |  y <- Array(123,456,789)
 | } yield (x, y), 100)
mydata: org.apache.spark.rdd.RDD[(Int, Int)] =
ParallelCollectionRDD[0] at parallelize at :27

scala> val rddMyData = mydata.partitionBy(new HashPartitioner(5))
rddMyData: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at
partitionBy at :26

scala> val rddMyDataPartitions =   rddMyData.mapPartitionsWithIndex{
 | (index, iterator) => {
 |val myList = iterator.toList
 |myList.map(x => x + " -> " +
index).iterator
 | }
 |  }
rddMyDataPartitions: org.apache.spark.rdd.RDD[String] =
MapPartitionsRDD[2] at mapPartitionsWithIndex at :26

scala>
 | // this is expected:

scala> rddMyDataPartitions.take(100)
res1: Array[String] = Array((0,123) -> 0, (0,456) -> 0, (0,789) ->
0, (1,123) -> 1, (1,456) -> 1, (1,789) -> 1, (2,123) -> 2, (2,456) -> 2,
(2,789) -> 2, (3,456) -> 3, (3,789) -> 3, (3,123) -> 3, (4,789) -> 4,
(4,123) -> 4, (4,456) -> 4)

scala> val dfMyData = mydata.toDF()
dfMyData: org.apache.spark.sql.DataFrame = [_1: int, _2: int]

scala> val dfMyDataRepartitioned = dfMyData.repartition(5,col("_1"))
dfMyDataRepartitioned:
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: int, _2: int]

scala> dfMyDataRepartitioned.explain(false)
== Physical Plan ==
Exchange hashpartitioning(_1#3, 5)
+- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2,
true])._1 AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
   +- Scan ExternalRDDScan[obj#2]

scala> val dfMyDataRepartitionedPartition  =
dfMyDataRepartitioned.withColumn("partition_id",
spark_partition_id()).groupBy("partition_id").count()
dfMyDataRepartitionedPartition: org.apache.spark.sql.DataFrame =
[partition_id: int, count: bigint]

scala> // this is unexpected, because 1 partition has more indexes

scala> dfMyDataRepartitionedPartition.show()
++-+
|partition_id|count|
++-+
|   1|6|
|   3|3|
|   4|3|
|   2|3|
++-+



I also wrote this question to stackoverflow, but I wanted to connect the
experts directly as well :
https://stackoverflow.com/questions/54215601/how-is-exchange-hashpartitioning-working-in-spark


Thanks in advance!




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [ANNOUNCE] Announcing Apache Spark 2.2.3

2019-01-16 Thread Hyukjin Kwon
Nice!

2019년 1월 16일 (수) 오전 11:55, Jiaan Geng 님이 작성:

> Glad to hear this.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


How to unsubscribe???

2019-01-16 Thread Junior Alvarez
Hi!

I've been sending an unsubscribe mail, to this address: 
user-unsubscr...@spark.apache.org, 
for the last months, and still...I don't manage to unsubscribe...Why???

B r
/Junior


Unsubscribe

2019-01-16 Thread Deepak Sahoo
Unsubscribe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How thriftserver load data

2019-01-16 Thread Soheil Pourbafrani
Hi, I want to write an application that load data from HDFS into tables and
create a ThriftServer and submit it to the YARN cluster.

The question is how Spark actually load data. Does Spark load data in the
memory since the application started or it waits for query and just loads
data according to the query needs?