Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread kant kodali
Sorry I meant Spark 2.4 in my previous email

On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:

> Hi TD,
>
> I agree I think we are better off either with a full fix or no fix. I am
> ok with the complete fix being available in master or some branch. I guess
> the solution for me is to just build from the source.
>
> On a similar note, I am not finding any JIRA tickets related to full outer
> joins and update mode for maybe say Spark 2.3. I wonder how hard is it two
> implement both of these? It turns out the update mode and full outer join
> is very useful and required in my case, therefore, I'm just asking.
>
> Thanks!
>
> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das  > wrote:
>
>> I thought about it.
>> I am not 100% sure whether this fix should go into 2.3.1.
>>
>> There are two parts to this bug fix to enable self-joins.
>>
>> 1. Enabling deduping of leaf logical nodes by extending
>> MultInstanceRelation
>>   - This is safe to be backported into the 2.3 branch as it does not
>> touch production code paths.
>>
>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>> micro-batch plan is spliced into the streaming plan.
>>   - This touches core production code paths and therefore, may not safe
>> to backport.
>>
>> Part 1 enables self-joins in all but a small fraction of self-join
>> queries. That small fraction can produce incorrect results, and part 2
>> avoids that.
>>
>> So for 2.3.1, we can enable self-joins by merging only part 1, but it can
>> give wrong results in some cases. I think that is strictly worse than no
>> fix.
>>
>> TD
>>
>>
>>
>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali  wrote:
>>
>>> Hi TD,
>>>
>>> I pulled your commit that is listed on this ticket
>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
>>> the following steps and self joins work after I cherry-pick your commit!
>>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>>> targeted for 2.3.1 :(
>>>
>>> git clone https://github.com/apache/spark.gitcd spark
>>> git fetch
>>> git checkout branch-2.3
>>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>>> ./build/mvn -DskipTests compile
>>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
>>> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>>>
>>>
>>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Hey,

 Thanks for testing out stream-stream joins and reporting this issue. I
 am going to take a look at this.

 TD



 On Tue, Feb 20, 2018 at 8:20 PM, kant kodali 
 wrote:

> if I change it to the below code it works. However, I don't believe it
> is the solution I am looking for. I want to be able to do it in raw SQL 
> and
> moreover, If a user gives a big chained raw spark SQL join query I am not
> even sure how to make copies of the dataframe to achieve the self-join. Is
> there any other way here?
>
>
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", 
> "join_test").option("startingOffsets", "earliest").load();
> val jdf1 = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", 
> "join_test").option("startingOffsets", "earliest").load();
>
> jdf.createOrReplaceTempView("table")
> jdf1.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table inner join table1 on 
> table.offset=table1.offset")
>
> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>  false).trigger(Trigger.ProcessingTime(1000)).start()
>
>
> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali 
> wrote:
>
>> If I change it to this
>>
>>
>>
>>
>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali 
>> wrote:
>>
>>> Hi All,
>>>
>>> I have the following code
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table as x inner join table as 
>>> y on x.offset=y.offset")
>>>
>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>> and I get the following exception.
>>>
>>> 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread kant kodali
Hi TD,

I agree I think we are better off either with a full fix or no fix. I am ok
with the complete fix being available in master or some branch. I guess the
solution for me is to just build from the source.

On a similar note, I am not finding any JIRA tickets related to full outer
joins and update mode for maybe say Spark 2.3. I wonder how hard is it two
implement both of these? It turns out the update mode and full outer join
is very useful and required in my case, therefore, I'm just asking.

Thanks!

On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das 
wrote:

> I thought about it.
> I am not 100% sure whether this fix should go into 2.3.1.
>
> There are two parts to this bug fix to enable self-joins.
>
> 1. Enabling deduping of leaf logical nodes by extending
> MultInstanceRelation
>   - This is safe to be backported into the 2.3 branch as it does not touch
> production code paths.
>
> 2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch
> plan is spliced into the streaming plan.
>   - This touches core production code paths and therefore, may not safe to
> backport.
>
> Part 1 enables self-joins in all but a small fraction of self-join
> queries. That small fraction can produce incorrect results, and part 2
> avoids that.
>
> So for 2.3.1, we can enable self-joins by merging only part 1, but it can
> give wrong results in some cases. I think that is strictly worse than no
> fix.
>
> TD
>
>
>
> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali  wrote:
>
>> Hi TD,
>>
>> I pulled your commit that is listed on this ticket
>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the
>> following steps and self joins work after I cherry-pick your commit!
>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>> targeted for 2.3.1 :(
>>
>> git clone https://github.com/apache/spark.gitcd spark
>> git fetch
>> git checkout branch-2.3
>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>> ./build/mvn -DskipTests compile
>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
>> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>>
>>
>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Hey,
>>>
>>> Thanks for testing out stream-stream joins and reporting this issue. I
>>> am going to take a look at this.
>>>
>>> TD
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:
>>>
 if I change it to the below code it works. However, I don't believe it
 is the solution I am looking for. I want to be able to do it in raw SQL and
 moreover, If a user gives a big chained raw spark SQL join query I am not
 even sure how to make copies of the dataframe to achieve the self-join. Is
 there any other way here?



 import org.apache.spark.sql.streaming.Trigger

 val jdf = 
 spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
 "localhost:9092").option("subscribe", 
 "join_test").option("startingOffsets", "earliest").load();
 val jdf1 = 
 spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
 "localhost:9092").option("subscribe", 
 "join_test").option("startingOffsets", "earliest").load();

 jdf.createOrReplaceTempView("table")
 jdf1.createOrReplaceTempView("table")

 val resultdf = spark.sql("select * from table inner join table1 on 
 table.offset=table1.offset")

 resultdf.writeStream.outputMode("append").format("console").option("truncate",
  false).trigger(Trigger.ProcessingTime(1000)).start()


 On Tue, Feb 20, 2018 at 8:16 PM, kant kodali 
 wrote:

> If I change it to this
>
>
>
>
> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I have the following code
>>
>> import org.apache.spark.sql.streaming.Trigger
>>
>> val jdf = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", 
>> "join_test").option("startingOffsets", "earliest").load();
>>
>> jdf.createOrReplaceTempView("table")
>>
>> val resultdf = spark.sql("select * from table as x inner join table as y 
>> on x.offset=y.offset")
>>
>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>
>> and I get the following exception.
>>
>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' 
>> given input columns: [x.value, x.offset, x.key, x.timestampType, 
>> x.topic, x.timestamp, x.partition]; line 1 pos 50;
>> 'Project [*]
>> +- 'Join Inner, ('x.offset = 'y.offset)
>>:- SubqueryAlias x
>> 

Spark StreamingContext Question

2018-03-06 Thread रविशंकर नायर
Hi all,

Understand from documentation that, only one streaming context can be
active in a JVM at the same time.

Hence in an enterprise cluster, how can we manage/handle multiple users are
having many different streaming applications, one may be ingesting data
from Flume, another from Twitter etc? Is this not available now?

Best,
Passion


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread Tathagata Das
I thought about it.
I am not 100% sure whether this fix should go into 2.3.1.

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending
MultInstanceRelation
  - This is safe to be backported into the 2.3 branch as it does not touch
production code paths.

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch
plan is spliced into the streaming plan.
  - This touches core production code paths and therefore, may not safe to
backport.

Part 1 enables self-joins in all but a small fraction of self-join queries.
That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can
give wrong results in some cases. I think that is strictly worse than no
fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali  wrote:

> Hi TD,
>
> I pulled your commit that is listed on this ticket https://issues.apache.
> org/jira/browse/SPARK-23406 specifically I did the following steps and
> self joins work after I cherry-pick your commit! Good Job! I was hoping it
> will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(
>
> git clone https://github.com/apache/spark.gitcd spark
> git fetch
> git checkout branch-2.3
> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
> ./build/mvn -DskipTests compile
> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>
>
> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hey,
>>
>> Thanks for testing out stream-stream joins and reporting this issue. I am
>> going to take a look at this.
>>
>> TD
>>
>>
>>
>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:
>>
>>> if I change it to the below code it works. However, I don't believe it
>>> is the solution I am looking for. I want to be able to do it in raw SQL and
>>> moreover, If a user gives a big chained raw spark SQL join query I am not
>>> even sure how to make copies of the dataframe to achieve the self-join. Is
>>> there any other way here?
>>>
>>>
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>> val jdf1 = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>> jdf1.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table inner join table1 on 
>>> table.offset=table1.offset")
>>>
>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali  wrote:
>>>
 If I change it to this




 On Tue, Feb 20, 2018 at 7:52 PM, kant kodali 
 wrote:

> Hi All,
>
> I have the following code
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", 
> "join_test").option("startingOffsets", "earliest").load();
>
> jdf.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table as x inner join table as y 
> on x.offset=y.offset")
>
> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>  false).trigger(Trigger.ProcessingTime(1000)).start()
>
> and I get the following exception.
>
> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
> x.timestamp, x.partition]; line 1 pos 50;
> 'Project [*]
> +- 'Join Inner, ('x.offset = 'y.offset)
>:- SubqueryAlias x
>:  +- SubqueryAlias table
>: +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
> offset#32L, timestamp#33, timestampType#34]
>+- SubqueryAlias y
>   +- SubqueryAlias table
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
> 

[Spark CSV DataframeWriter] Quote options for columns on write

2018-03-06 Thread Brandon Geise
My problem is related to the need to have all records in a specific column 
quoted when writing a CSV.  I assumed that by setting the options escapeQuotes 
to false in the options, that fields would not have any type of quoting 
applied, even when that delimiter exists.  Unless I am misunderstanding what 
the option actually does, it appears it doesn’t work as expected.  Can anyone 
provide some information around that option?  And if the option will not do 
what I’m trying to accomplish, is the best solution to set the quote option to 
\u (My issue here is we are sending these CSVS to a partner and I’d like it 
to be more in line with a normal quoted field)?  

 

Thanks,

Brandon



Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-06 Thread Junfeng Chen
Spark 2.1.1.

Actually it is a warning rather than an exception, so there is no stack
trace. Just many this line:

> CachedKafkaConsumer: CachedKafkaConsumer is not running in
> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are
> interrupted because of KAFKA-1894.



Regard,
Junfeng Chen

On Wed, Mar 7, 2018 at 3:34 AM, Tathagata Das 
wrote:

> Which version of Spark are you using? And can you give us the full stack
> trace of the exception?
>
> On Tue, Mar 6, 2018 at 1:53 AM, Junfeng Chen  wrote:
>
>> I am trying to read kafka and save the data as parquet file on hdfs
>> according to this  https://stackoverflow.com/questions/45827664/read-from
>> -kafka-and-write-to-hdfs-in-parquet
>> 
>>
>>
>> The code is similar to :
>>
>> val df = spark
>>   .read
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>>   .option("subscribe", "topic1")
>>   .load()
>>
>> while I am writing in Java.
>>
>> However, I keep throwing the following warning:
>> CachedKafkaConsumer: CachedKafkaConsumer is not running in
>> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are
>> interrupted because of KAFKA-1894.
>>
>> How to solve it? Thanks!
>>
>>
>> Regard,
>> Junfeng Chen
>>
>
>


dependencies conflict in oozie spark action for spark 2

2018-03-06 Thread Lian Jiang
I am using HDP 2.6.4 and have followed
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_spark-component-guide/content/ch_oozie-spark-action.html
to make oozie use spark2.

After this, I found there are still a bunch of issues:

1. oozie and spark tries to add the same jars multiple time into cache.
This is resolved by removing the duplicate jars
from /user/oozie/share/lib/lib_20180303065325/spark2/ folder.

2. jar conflict which is not resolved. The exception is below:

18/03/06 23:51:18 ERROR ApplicationMaster: User class threw exception:
java.lang.NoSuchFieldError: USE_DEFAULTS
java.lang.NoSuchFieldError: USE_DEFAULTS
at
com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.findSerializationInclusion(JacksonAnnotationIntrospector.java:498)
at
com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.findSerializationInclusion(AnnotationIntrospectorPair.java:332)
at
com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.findSerializationInclusion(AnnotationIntrospectorPair.java:332)
at
com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findSerializationInclusion(BasicBeanDescription.java:381)
at
com.fasterxml.jackson.databind.ser.PropertyBuilder.(PropertyBuilder.java:41)
at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory.constructPropertyBuilder(BeanSerializerFactory.java:507)
at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory.findBeanProperties(BeanSerializerFactory.java:558)
at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory.constructBeanSerializer(BeanSerializerFactory.java:361)
at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory.findBeanSerializer(BeanSerializerFactory.java:272)
at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:225)
at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:153)
at
com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1203)
at
com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1157)
at
com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:481)
at
com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:679)
at
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:107)
at
com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559)
at
com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927)
at org.apache.spark.rdd.RDDOperationScope.toJson(RDDOperationScope.scala:52)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:145)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)



My dependencies are:

libraryDependencies += "com.typesafe.scala-logging" %%
"scala-logging-api" % "2.1.2"
libraryDependencies += "com.typesafe.scala-logging" %%
"scala-logging-slf4j" % "2.1.2"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0"
libraryDependencies += "com.typesafe" % "config" % "1.3.2"
libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.4"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "test"
libraryDependencies += "org.scalamock" %% "scalamock" % "4.1.0" % "test"
libraryDependencies += "com.jsuereth" %% "scala-arm" % "2.0"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.7.0"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.8"
libraryDependencies += "io.dropwizard.metrics" % "metrics-core" % "4.0.2"
libraryDependencies += "com.typesafe.slick" %% "slick" % "3.2.1"
libraryDependencies += "com.typesafe.slick" %% "slick-hikaricp" % "3.2.1"
libraryDependencies += "com.typesafe.slick" %% "slick-extensions" % "3.0.0"
libraryDependencies += "org.scalaz" %% "scalaz-core" % "7.2.19"
libraryDependencies += "org.json4s" %% "json4s-native" % "3.5.3"
libraryDependencies += "com.softwaremill.retry" %% "retry" % "0.3.0"
libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.5.5"
libraryDependencies += "org.apache.httpcomponents" % "httpcore" % "4.4.9"


Sbt dependency tree shows jackson 2.6.5 coming from spark-core is in use.
But per
https://stackoverflow.com/questions/36982173/java-lang-nosuchfielderror-use-defaults-thrown-while-validating-json-schema-thr,
spark is using jackson version before 2.6 causing "NoSuchFieldError:
USE_DEFAULTS".

I have done:

1. succeed to run the same application through spark-submit.

2. make sure the spark dependencies are 2.2.0 to be consistent with that in

Dynamic allocation Spark Stremaing

2018-03-06 Thread KhajaAsmath Mohammed
Hi,

I have enabled dynamic allocation for spark streaming application but the
number of containers always shows as 2. Is there a way to get more when job
is running?




Thanks,
Asmath


Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-06 Thread Tathagata Das
Which version of Spark are you using? And can you give us the full stack
trace of the exception?

On Tue, Mar 6, 2018 at 1:53 AM, Junfeng Chen  wrote:

> I am trying to read kafka and save the data as parquet file on hdfs
> according to this  https://stackoverflow.com/questions/45827664/read-from
> -kafka-and-write-to-hdfs-in-parquet
> 
>
>
> The code is similar to :
>
> val df = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("subscribe", "topic1")
>   .load()
>
> while I am writing in Java.
>
> However, I keep throwing the following warning:
> CachedKafkaConsumer: CachedKafkaConsumer is not running in
> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are
> interrupted because of KAFKA-1894.
>
> How to solve it? Thanks!
>
>
> Regard,
> Junfeng Chen
>


Re: OutOfDirectMemoryError for Spark 2.2

2018-03-06 Thread Chawla,Sumit
No,  This is the only Stack trace i get.  I have tried DEBUG but didn't
notice much of a log change.

Yes,  I have tried bumping MaxDirectMemorySize to get rid of this error.
It does work if i throw 4G+ memory at it.  However,  I am trying to
understand this behavior so that i can setup this number to appropriate
value.

Regards
Sumit Chawla


On Tue, Mar 6, 2018 at 8:07 AM, Vadim Semenov  wrote:

> Do you have a trace? i.e. what's the source of `io.netty.*` calls?
>
> And have you tried bumping `-XX:MaxDirectMemorySize`?
>
> On Tue, Mar 6, 2018 at 12:45 AM, Chawla,Sumit 
> wrote:
>
>> Hi All
>>
>> I have a job which processes a large dataset.  All items in the dataset
>> are unrelated.  To save on cluster resources,  I process these items in
>> chunks.  Since chunks are independent of each other,  I start and shut down
>> the spark context for each chunk.  This allows me to keep DAG smaller and
>> not retry the entire DAG in case of failures.   This mechanism used to work
>> fine with Spark 1.6.  Now,  as we have moved to 2.2,  the job started
>> failing with OutOfDirectMemoryError error.
>>
>> 2018-03-03 22:00:59,687 WARN  [rpc-server-48-1]
>> server.TransportChannelHandler 
>> (TransportChannelHandler.java:exceptionCaught(78))
>> - Exception in connection from /10.66.73.27:60374
>>
>> io.netty.util.internal.OutOfDirectMemoryError: failed to allocate
>> 8388608 byte(s) of direct memory (used: 1023410176, max: 1029177344)
>>
>> at io.netty.util.internal.PlatformDependent.incrementMemoryCoun
>> ter(PlatformDependent.java:506)
>>
>> at io.netty.util.internal.PlatformDependent.allocateDirectNoCle
>> aner(PlatformDependent.java:460)
>>
>> at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolAre
>> na.java:701)
>>
>> at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:690)
>>
>> at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
>>
>> at io.netty.buffer.PoolArena.allocate(PoolArena.java:213)
>>
>> at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
>>
>> at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(Poole
>> dByteBufAllocator.java:271)
>>
>> at io.netty.buffer.AbstractByteBufAllocator.directBuffer(Abstra
>> ctByteBufAllocator.java:177)
>>
>> at io.netty.buffer.AbstractByteBufAllocator.directBuffer(Abstra
>> ctByteBufAllocator.java:168)
>>
>> at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractBy
>> teBufAllocator.java:129)
>>
>> at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.
>> allocate(AdaptiveRecvByteBufAllocator.java:104)
>>
>> at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.
>> read(AbstractNioByteChannel.java:117)
>>
>> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
>> tLoop.java:564)
>>
>> I got some clue on what is causing this from https://github.com/netty/
>> netty/issues/6343,  However I am not able to add up numbers on what is
>> causing 1 GB of Direct Memory to fill up.
>>
>> Output from jmap
>>
>>
>> 7: 22230 1422720 io.netty.buffer.PoolSubpage
>>
>> 12: 1370 804640 io.netty.buffer.PoolSubpage[]
>>
>> 41: 3600 144000 io.netty.buffer.PoolChunkList
>>
>> 98: 1440 46080 io.netty.buffer.PoolThreadCache$SubPageMemoryRegionCache
>>
>> 113: 300 40800 io.netty.buffer.PoolArena$HeapArena
>>
>> 114: 300 40800 io.netty.buffer.PoolArena$DirectArena
>>
>> 192: 198 15840 io.netty.buffer.PoolChunk
>>
>> 274: 120 8320 io.netty.buffer.PoolThreadCache$MemoryRegionCache[]
>>
>> 406: 120 3840 io.netty.buffer.PoolThreadCache$NormalMemoryRegionCache
>>
>> 422: 72 3552 io.netty.buffer.PoolArena[]
>>
>> 458: 30 2640 io.netty.buffer.PooledUnsafeDirectByteBuf
>>
>> 500: 36 2016 io.netty.buffer.PooledByteBufAllocator
>>
>> 529: 32 1792 io.netty.buffer.UnpooledUnsafeHeapByteBuf
>>
>> 589: 20 1440 io.netty.buffer.PoolThreadCache
>>
>> 630: 37 1184 io.netty.buffer.EmptyByteBuf
>>
>> 703: 36 864 io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache
>>
>> 852: 22 528 io.netty.buffer.AdvancedLeakAwareByteBuf
>>
>> 889: 10 480 io.netty.buffer.SlicedAbstractByteBuf
>>
>> 917: 8 448 io.netty.buffer.UnpooledHeapByteBuf
>>
>> 1018: 20 320 io.netty.buffer.PoolThreadCache$1
>>
>> 1305: 4 128 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>>
>> 1404: 1 80 io.netty.buffer.PooledUnsafeHeapByteBuf
>>
>> 1473: 3 72 io.netty.buffer.PoolArena$SizeClass
>>
>> 1529: 1 64 io.netty.buffer.AdvancedLeakAwareCompositeByteBuf
>>
>> 1541: 2 64 io.netty.buffer.CompositeByteBuf$Component
>>
>> 1568: 1 56 io.netty.buffer.CompositeByteBuf
>>
>> 1896: 1 32 io.netty.buffer.PoolArena$SizeClass[]
>>
>> 2042: 1 24 io.netty.buffer.PooledUnsafeDirectByteBuf$1
>>
>> 2046: 1 24 io.netty.buffer.UnpooledByteBufAllocator
>>
>> 2051: 1 24 io.netty.buffer.PoolThreadCache$MemoryRegionCache$1
>>
>> 2078: 1 24 io.netty.buffer.PooledHeapByteBuf$1
>>
>> 2135: 1 24 io.netty.buffer.PooledUnsafeHeapByteBuf$1
>>
>> 2302: 1 16 io.netty.buffer.ByteBufUtil$1
>>
>> 2769: 1 16 

Re: OutOfDirectMemoryError for Spark 2.2

2018-03-06 Thread Vadim Semenov
Do you have a trace? i.e. what's the source of `io.netty.*` calls?

And have you tried bumping `-XX:MaxDirectMemorySize`?

On Tue, Mar 6, 2018 at 12:45 AM, Chawla,Sumit 
wrote:

> Hi All
>
> I have a job which processes a large dataset.  All items in the dataset
> are unrelated.  To save on cluster resources,  I process these items in
> chunks.  Since chunks are independent of each other,  I start and shut down
> the spark context for each chunk.  This allows me to keep DAG smaller and
> not retry the entire DAG in case of failures.   This mechanism used to work
> fine with Spark 1.6.  Now,  as we have moved to 2.2,  the job started
> failing with OutOfDirectMemoryError error.
>
> 2018-03-03 22:00:59,687 WARN  [rpc-server-48-1]
> server.TransportChannelHandler 
> (TransportChannelHandler.java:exceptionCaught(78))
> - Exception in connection from /10.66.73.27:60374
>
> io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 8388608
> byte(s) of direct memory (used: 1023410176, max: 1029177344)
>
> at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(
> PlatformDependent.java:506)
>
> at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(
> PlatformDependent.java:460)
>
> at io.netty.buffer.PoolArena$DirectArena.allocateDirect(
> PoolArena.java:701)
>
> at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:690)
>
> at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
>
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:213)
>
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
>
> at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(
> PooledByteBufAllocator.java:271)
>
> at io.netty.buffer.AbstractByteBufAllocator.directBuffer(
> AbstractByteBufAllocator.java:177)
>
> at io.netty.buffer.AbstractByteBufAllocator.directBuffer(
> AbstractByteBufAllocator.java:168)
>
> at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(
> AbstractByteBufAllocator.java:129)
>
> at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(
> AdaptiveRecvByteBufAllocator.java:104)
>
> at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(
> AbstractNioByteChannel.java:117)
>
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(
> NioEventLoop.java:564)
>
> I got some clue on what is causing this from https://github.com/netty/
> netty/issues/6343,  However I am not able to add up numbers on what is
> causing 1 GB of Direct Memory to fill up.
>
> Output from jmap
>
>
> 7: 22230 1422720 io.netty.buffer.PoolSubpage
>
> 12: 1370 804640 io.netty.buffer.PoolSubpage[]
>
> 41: 3600 144000 io.netty.buffer.PoolChunkList
>
> 98: 1440 46080 io.netty.buffer.PoolThreadCache$SubPageMemoryRegionCache
>
> 113: 300 40800 io.netty.buffer.PoolArena$HeapArena
>
> 114: 300 40800 io.netty.buffer.PoolArena$DirectArena
>
> 192: 198 15840 io.netty.buffer.PoolChunk
>
> 274: 120 8320 io.netty.buffer.PoolThreadCache$MemoryRegionCache[]
>
> 406: 120 3840 io.netty.buffer.PoolThreadCache$NormalMemoryRegionCache
>
> 422: 72 3552 io.netty.buffer.PoolArena[]
>
> 458: 30 2640 io.netty.buffer.PooledUnsafeDirectByteBuf
>
> 500: 36 2016 io.netty.buffer.PooledByteBufAllocator
>
> 529: 32 1792 io.netty.buffer.UnpooledUnsafeHeapByteBuf
>
> 589: 20 1440 io.netty.buffer.PoolThreadCache
>
> 630: 37 1184 io.netty.buffer.EmptyByteBuf
>
> 703: 36 864 io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache
>
> 852: 22 528 io.netty.buffer.AdvancedLeakAwareByteBuf
>
> 889: 10 480 io.netty.buffer.SlicedAbstractByteBuf
>
> 917: 8 448 io.netty.buffer.UnpooledHeapByteBuf
>
> 1018: 20 320 io.netty.buffer.PoolThreadCache$1
>
> 1305: 4 128 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>
> 1404: 1 80 io.netty.buffer.PooledUnsafeHeapByteBuf
>
> 1473: 3 72 io.netty.buffer.PoolArena$SizeClass
>
> 1529: 1 64 io.netty.buffer.AdvancedLeakAwareCompositeByteBuf
>
> 1541: 2 64 io.netty.buffer.CompositeByteBuf$Component
>
> 1568: 1 56 io.netty.buffer.CompositeByteBuf
>
> 1896: 1 32 io.netty.buffer.PoolArena$SizeClass[]
>
> 2042: 1 24 io.netty.buffer.PooledUnsafeDirectByteBuf$1
>
> 2046: 1 24 io.netty.buffer.UnpooledByteBufAllocator
>
> 2051: 1 24 io.netty.buffer.PoolThreadCache$MemoryRegionCache$1
>
> 2078: 1 24 io.netty.buffer.PooledHeapByteBuf$1
>
> 2135: 1 24 io.netty.buffer.PooledUnsafeHeapByteBuf$1
>
> 2302: 1 16 io.netty.buffer.ByteBufUtil$1
>
> 2769: 1 16 io.netty.util.internal.__matchers__.io.netty.buffer.
> ByteBufMatcher
>
>
>
> My Driver machine has 32 CPUs,  and as of now i have 15 machines in my
> cluster.   As of now, the error happens on processing 5th or 6th chunk.  I
> suspect the error is dependent on number of Executors and would happen
> early if we add more executors.
>
>
> I am trying to come up an explanation of what is filling up the Direct
> Memory and how to quanitfy it as factor of Number of Executors.  Our
> cluster is shared cluster,  And we need to understand how much Driver
> Memory to allocate for most of the jobs.
>
>
>
>
>
> 

Distributed Nature of Spark and Time Series Temporal Dependence

2018-03-06 Thread arshanvit
Hi All,

I am new to Spark and I am trying to use forecasting models on time-series
data.As per my understanding,the Spark Dataframes are distributed collection
of data.This distributed nature can attribute that chunks of data will not
be dependent on each other and are possibly treated separately and in
parallel manner.

To mitigate this thing for timeseries data and for accurate prediction, i
thought instead of making dataframe from large amount of data,i divide it
into test and train data in such a way that train and test data will not get
distributed among nodes and are treated in one go.

If this approach is possible,how can I ensure that data not got distributed
and how to approach towards it?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-06 Thread Junfeng Chen
I am trying to read kafka and save the data as parquet file on hdfs
according to this
https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet



The code is similar to :

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

while I am writing in Java.

However, I keep throwing the following warning:
CachedKafkaConsumer: CachedKafkaConsumer is not running in
UninterruptibleThread. It may hang when CachedKafkaConsumer's method are
interrupted because of KAFKA-1894.

How to solve it? Thanks!


Regard,
Junfeng Chen


Re: Properly stop applications or jobs within the application

2018-03-06 Thread bsikander
It seems to be related to this issue from Kafka
https://issues.apache.org/jira/browse/KAFKA-1894



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org