Re: Turning off Jetty Http Options Method

2019-04-30 Thread Ankit Jain
I agree for an OSS project all end points that can be called are already
publicly available.

https://security.stackexchange.com/questions/138567/why-should-the-options-method-not-be-allowed-on-an-http-server
has
couple of good reasons though.

"An essential part of security is to reduce the attack surface by removing
any functionality which is not needed. Usually this is also functionality
which is less well tested and thus might be a vector for unexpected
attacks."

2nd answer is more ominous-
"Others have pointed out that you want to limit your attack surface, and to
be aware that some Ajax sites legitimately use OPTIONS. Anyway, I wanted to
share a recent experience with you.

We had tested a site and discovered it was vulnerable to executable file
upload. Roughly speaking, you could upload a JSP file as your profile
picture, then execute the JSP file and take control of the server.

The client's first attempt at a fix blocked fetching the JSP with a GET
request. However, we discovered it was still possible to execute the JSP
using an OPTIONS request. You don't get the JSP output - but it's easy to
code the JSP to connect back with an out-of-band mechanism.

In this case, allowing OPTIONS allowed a remote server compromise."

Thanks

Ankit

On Tue, Apr 30, 2019 at 7:35 PM  wrote:

> If this is correct *“**This method exposes what all methods are supported
> by the end point” , *I really don’t understand how’s that a security
> vulnerability considering the OSS nature of this project. Are you adding
> new endpoints to this webserver?
>
>
>
> More info about info/other methods :
> https://security.stackexchange.com/questions/21413/how-to-exploit-http-methods
>
>
>
>
>
> *From:* Ankit Jain 
> *Sent:* Tuesday, April 30, 2019 7:25 PM
> *To:* user@spark.apache.org; d...@spark.apache.org
> *Subject:* Re: Turning off Jetty Http Options Method
>
>
>
> + *dev*@*spark*.apache.org
>
>
>
> On Tue, Apr 30, 2019 at 4:23 PM Ankit Jain 
> wrote:
>
> Aah - actually found https://issues.apache.org/jira/browse/SPARK-18664 -
> "Don't respond to HTTP OPTIONS in HTTP-based UIs"
>
>
>
> Does anyone know if this can be prioritized?
>
>
>
> Thanks
>
> Ankit
>
>
>
> On Tue, Apr 30, 2019 at 1:31 PM Ankit Jain 
> wrote:
>
> Hi Fellow Spark users,
>
> We are using Spark 2.3.0 and security team is reporting a violation that
> Spark allows HTTP OPTIONS method to work(This method exposes what all
> methods are supported by the end point which could be exploited by a
> hacker).
>
>
>
> This method is on Jetty web server, I see Spark uses Jetty for web UI and
> some internal communication as well.
>
>
>
> For Spark UI, we are planning to write a javaxfiler, create a jar and add
> it to spark libs to not respond to options method. We don't have a clean
> solution for internal jetty server that is used as a file server though.
>
>
>
> It will be nice if Spark itself didn't allow Options method to work,
> similar to what was done for TRACE -
> https://issues.apache.org/jira/browse/SPARK-5983
>
>
>
> What do you guys think? Does community feel this should be something added
> directly to spark code?
>
>
>
> Also, if there is a later version of Spark where this has been addressed,
> please let us know too.
>
>
>
> --
>
> Thanks & Regards,
>
> Ankit.
>
>
>
>
> --
>
> Thanks & Regards,
>
> Ankit.
>
>
>
>
> --
>
> Thanks & Regards,
>
> Ankit.
>


-- 
Thanks & Regards,
Ankit.


spark on kubernetes driver pod phase changed from running to pending and starts another container in pod

2019-04-30 Thread zyfo2
I'm using spark-on-kubernetes to submit spark app to kubernetes.
most of the time, it runs smoothly.
but sometimes, I see logs after submitting: the driver pod phase changed
from running to pending and starts another container in the pod though the
first container exited successfully.

The driver log is nothing special, the first container ran successfully and
exited. The second failed cause it checks the filepath of the output and
returns error if already existed. What I can see from the log is that the
second container starts shortly after the first one exited. I attached the
driver log files.

I use the standard spark-submit to kubernetes like:

/opt/spark/bin/spark-submit --conf spark.driver.bindAddress=10.244.12.106
--deploy-mode client --properties-file /opt/spark/conf/spark.properties
--class com..cloud.mf.trainer.Submit spark-internal --ak
970f5e4c-7171-4c61-603e-f101b65a573b --tracking_server_url
http://10.155.197.12:8080 --graph
hdfs://yq01-m12-ai2b-service02.yq01..com:9000/project/62247e3a-e322-4456-6387-a66e9490652e/exp/62c37ae9-12aa-43f7-671f-d187e1bf1f84/graph/08e1dfad-c272-45ca-4201-1a8bc691a56e/meta/node1555661669082/graph.json
--sk 56305f9f-b755-4b42-4218-592555f5c4a8 --mode train


My env:

Kubernetes version (use kubectl version):
v1.10.0

OS (e.g: cat /etc/os-release):
CentOS-7

Kernel (e.g. uname -a):
4.17.11-1.el7.elrepo.x86_64

Spark-2.4.0

I uploaded the driver logs and kubectl describe pod output and spark-submit
output:
driver-pod-logs.zip

  
describe-pod.log

  
spark-submit-output.log

  


Any help appreciated. Thank you.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Turning off Jetty Http Options Method

2019-04-30 Thread email
If this is correct “This method exposes what all methods are supported by the 
end point” , I really don’t understand how’s that a security vulnerability 
considering the OSS nature of this project. Are you adding new endpoints to 
this webserver? 

 

More info about info/other methods : 
https://security.stackexchange.com/questions/21413/how-to-exploit-http-methods

 

 

From: Ankit Jain  
Sent: Tuesday, April 30, 2019 7:25 PM
To: user@spark.apache.org; d...@spark.apache.org
Subject: Re: Turning off Jetty Http Options Method

 

+ d...@spark.apache.org  

 

On Tue, Apr 30, 2019 at 4:23 PM Ankit Jain mailto:ankitjain@gmail.com> > wrote:

Aah - actually found https://issues.apache.org/jira/browse/SPARK-18664 - "Don't 
respond to HTTP OPTIONS in HTTP-based UIs"

 

Does anyone know if this can be prioritized?

 

Thanks

Ankit

 

On Tue, Apr 30, 2019 at 1:31 PM Ankit Jain mailto:ankitjain@gmail.com> > wrote:

Hi Fellow Spark users,

We are using Spark 2.3.0 and security team is reporting a violation that Spark 
allows HTTP OPTIONS method to work(This method exposes what all methods are 
supported by the end point which could be exploited by a hacker).

 

This method is on Jetty web server, I see Spark uses Jetty for web UI and some 
internal communication as well. 

 

For Spark UI, we are planning to write a javaxfiler, create a jar and add it to 
spark libs to not respond to options method. We don't have a clean solution for 
internal jetty server that is used as a file server though.

 

It will be nice if Spark itself didn't allow Options method to work, similar to 
what was done for TRACE - https://issues.apache.org/jira/browse/SPARK-5983

 

What do you guys think? Does community feel this should be something added 
directly to spark code?

 

Also, if there is a later version of Spark where this has been addressed, 
please let us know too.

 

-- 

Thanks & Regards,

Ankit.




 

-- 

Thanks & Regards,

Ankit.




 

-- 

Thanks & Regards,

Ankit.



RE: [EXT] handling skewness issues

2019-04-30 Thread email
Please share the links if they are publicly available. Otherwise please share 
the name of the talks.  Thank you

 

From: Jules Damji  
Sent: Monday, April 29, 2019 8:04 PM
To: Michael Mansour 
Cc: rajat kumar ; user@spark.apache.org
Subject: Re: [EXT] handling skewness issues

 

Yes, indeed! A few talks in the developer and deep dives address the data skews 
issue and how to address them. 

 

I shall let the group know when the talk sessions are available.

 

Cheers 

Jules

 

Sent from my iPhone

Pardon the dumb thumb typos :)


On Apr 29, 2019, at 2:13 PM, Michael Mansour mailto:michael_mans...@symantec.com> > wrote:

There were recently some fantastic talks about this at the SparkSummit 
conference in San Francisco.  I suggest you check out the SparkSummit YouTube 
channel after May 9th for a deep dive into this topic. 

 

From: rajat kumar mailto:kumar.rajat20...@gmail.com> >
Date: Monday, April 29, 2019 at 9:34 AM
To: "user@spark.apache.org  " 
mailto:user@spark.apache.org> >
Subject: [EXT] handling skewness issues

 

Hi All, 

 

How to overcome skewness issues in spark ?

 

I read that we can add some randomness to key column before join and remove 
that random part after join.

 

is there any better way ? Above method seems to be a workaround.

 

thanks 

rajat



Re: Turning off Jetty Http Options Method

2019-04-30 Thread Ankit Jain
+ d...@spark.apache.org

On Tue, Apr 30, 2019 at 4:23 PM Ankit Jain  wrote:

> Aah - actually found https://issues.apache.org/jira/browse/SPARK-18664 -
> "Don't respond to HTTP OPTIONS in HTTP-based UIs"
>
> Does anyone know if this can be prioritized?
>
> Thanks
> Ankit
>
> On Tue, Apr 30, 2019 at 1:31 PM Ankit Jain 
> wrote:
>
>> Hi Fellow Spark users,
>> We are using Spark 2.3.0 and security team is reporting a violation that
>> Spark allows HTTP OPTIONS method to work(This method exposes what all
>> methods are supported by the end point which could be exploited by a
>> hacker).
>>
>> This method is on Jetty web server, I see Spark uses Jetty for web UI and
>> some internal communication as well.
>>
>> For Spark UI, we are planning to write a javaxfiler, create a jar and add
>> it to spark libs to not respond to options method. We don't have a clean
>> solution for internal jetty server that is used as a file server though.
>>
>> It will be nice if Spark itself didn't allow Options method to work,
>> similar to what was done for TRACE -
>> https://issues.apache.org/jira/browse/SPARK-5983
>>
>> What do you guys think? Does community feel this should be something
>> added directly to spark code?
>>
>> Also, if there is a later version of Spark where this has been addressed,
>> please let us know too.
>>
>> --
>> Thanks & Regards,
>> Ankit.
>>
>
>
> --
> Thanks & Regards,
> Ankit.
>


-- 
Thanks & Regards,
Ankit.


Re: Turning off Jetty Http Options Method

2019-04-30 Thread Ankit Jain
Aah - actually found https://issues.apache.org/jira/browse/SPARK-18664 -
"Don't respond to HTTP OPTIONS in HTTP-based UIs"

Does anyone know if this can be prioritized?

Thanks
Ankit

On Tue, Apr 30, 2019 at 1:31 PM Ankit Jain  wrote:

> Hi Fellow Spark users,
> We are using Spark 2.3.0 and security team is reporting a violation that
> Spark allows HTTP OPTIONS method to work(This method exposes what all
> methods are supported by the end point which could be exploited by a
> hacker).
>
> This method is on Jetty web server, I see Spark uses Jetty for web UI and
> some internal communication as well.
>
> For Spark UI, we are planning to write a javaxfiler, create a jar and add
> it to spark libs to not respond to options method. We don't have a clean
> solution for internal jetty server that is used as a file server though.
>
> It will be nice if Spark itself didn't allow Options method to work,
> similar to what was done for TRACE -
> https://issues.apache.org/jira/browse/SPARK-5983
>
> What do you guys think? Does community feel this should be something added
> directly to spark code?
>
> Also, if there is a later version of Spark where this has been addressed,
> please let us know too.
>
> --
> Thanks & Regards,
> Ankit.
>


-- 
Thanks & Regards,
Ankit.


Re: Issue with offset management using Spark on Dataproc

2019-04-30 Thread Shixiong(Ryan) Zhu
I recommend you to use Structured Streaming as it has a patch that can
workaround this issue: https://issues.apache.org/jira/browse/SPARK-26267

Best Regards,
Ryan


On Tue, Apr 30, 2019 at 3:34 PM Shixiong(Ryan) Zhu 
wrote:

> There is a known issue that Kafka may return a wrong offset even if there
> is no reset happening: https://issues.apache.org/jira/browse/KAFKA-7703
>
> Best Regards,
> Ryan
>
>
> On Tue, Apr 30, 2019 at 10:41 AM Austin Weaver 
> wrote:
>
>> @deng - There was a short erroneous period where 2 streams were reading
>> from the same topic and group id were running at the same time. We saw
>> errors in this and stopped the extra stream. That being said, I would think
>> regardless that the auto.offset.reset would kick in sine documentation says
>> that it will kick in if there is no existing current offset or that the
>> current offset no longer exists on the kafka topic? Moreover, that doesn't
>> explain the fact that the spark logs that it is on one offset for that
>> partition (5553330) - and then immediately errors out trying to read the
>> old offset (4544296) that no longer exists?
>>
>> @Akshay - I am using Spark Streaming (D-streams) Here is a snippet of the
>> kafka consumer configuration I am using (redacted some fields) -
>>
>> kakaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
>> kakaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "");
>> kakaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
>> kakaConsumerProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
>>  RoundRobinAssignor.class.getName());
>> kakaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
>> StringDeserializer.class);
>> kakaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
>> StringDeserializer.class);
>> kakaConsumerProperties.put("auto.offset.reset", "earliest");
>> kakaConsumerProperties.put("sasl.mechanism", "PLAIN");
>> kakaConsumerProperties.put("sasl.jaas.config", "security.protocol");
>> kakaConsumerProperties.put("security.protocol", "");
>>
>> and I'm using LocationStrategies.PreferConsistent()
>>
>> Thanks
>>
>> On Tue, Apr 30, 2019 at 5:56 AM Akshay Bhardwaj <
>> akshay.bhardwaj1...@gmail.com> wrote:
>>
>>> Hi Austin,
>>>
>>> Are you using Spark Streaming or Structured Streaming?
>>>
>>> For better understanding, could you also provide sample code/config
>>> params for your spark-kafka connector for the said streaming job?
>>>
>>>
>>> Akshay Bhardwaj
>>> +91-97111-33849
>>>
>>>
>>> On Mon, Apr 29, 2019 at 10:34 PM Austin Weaver 
>>> wrote:
>>>
 Hey guys, relatively new Spark Dev here and i'm seeing some kafka
 offset issues and was wondering if you guys could help me out.

 I am currently running a spark job on Dataproc and am getting errors
 trying to re-join a group and read data from a kafka topic. I have done
 some digging and am not sure what the issue is. I have
 auto.offset.reset set to earliest so it should being reading from the
 earliest available non-committed offset and initially my spark logs look
 like this :

 19/04/29 16:30:30 INFO
 org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
 clientId=consumer-1, groupId=demo-group] Resetting offset for
 partition demo.topic-11 to offset 5553330.
 19/04/29 16:30:30 INFO
 org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
 clientId=consumer-1, groupId=demo-group] Resetting offset for
 partition demo.topic-2 to offset 553.
 19/04/29 16:30:30 INFO
 org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
 clientId=consumer-1, groupId=demo-group] Resetting offset for
 partition demo.topic-3 to offset 484.
 19/04/29 16:30:30 INFO
 org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
 clientId=consumer-1, groupId=demo-group] Resetting offset for
 partition demo.topic-4 to offset 586.
 19/04/29 16:30:30 INFO
 org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
 clientId=consumer-1, groupId=demo-group] Resetting offset for
 partition demo.topic-5 to offset 502.
 19/04/29 16:30:30 INFO
 org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
 clientId=consumer-1, groupId=demo-group] Resetting offset for
 partition demo.topic-6 to offset 561.
 19/04/29 16:30:30 INFO
 org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
 clientId=consumer-1, groupId=demo-group] Resetting offset for
 partition demo.topic-7 to offset 542.```

 But then the very next line I get an error trying to read from a
 nonexistent offset on the server (you can see that the offset for the
 partition differs from the one listed above, so I have no idea why it would
 be attempting to read form that offset, here is the error on the next line:

 org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 

Re: Issue with offset management using Spark on Dataproc

2019-04-30 Thread Shixiong(Ryan) Zhu
There is a known issue that Kafka may return a wrong offset even if there
is no reset happening: https://issues.apache.org/jira/browse/KAFKA-7703

Best Regards,
Ryan


On Tue, Apr 30, 2019 at 10:41 AM Austin Weaver  wrote:

> @deng - There was a short erroneous period where 2 streams were reading
> from the same topic and group id were running at the same time. We saw
> errors in this and stopped the extra stream. That being said, I would think
> regardless that the auto.offset.reset would kick in sine documentation says
> that it will kick in if there is no existing current offset or that the
> current offset no longer exists on the kafka topic? Moreover, that doesn't
> explain the fact that the spark logs that it is on one offset for that
> partition (5553330) - and then immediately errors out trying to read the
> old offset (4544296) that no longer exists?
>
> @Akshay - I am using Spark Streaming (D-streams) Here is a snippet of the
> kafka consumer configuration I am using (redacted some fields) -
>
> kakaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
> kakaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "");
> kakaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
> kakaConsumerProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
>  RoundRobinAssignor.class.getName());
> kakaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class);
> kakaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class);
> kakaConsumerProperties.put("auto.offset.reset", "earliest");
> kakaConsumerProperties.put("sasl.mechanism", "PLAIN");
> kakaConsumerProperties.put("sasl.jaas.config", "security.protocol");
> kakaConsumerProperties.put("security.protocol", "");
>
> and I'm using LocationStrategies.PreferConsistent()
>
> Thanks
>
> On Tue, Apr 30, 2019 at 5:56 AM Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
>> Hi Austin,
>>
>> Are you using Spark Streaming or Structured Streaming?
>>
>> For better understanding, could you also provide sample code/config
>> params for your spark-kafka connector for the said streaming job?
>>
>>
>> Akshay Bhardwaj
>> +91-97111-33849
>>
>>
>> On Mon, Apr 29, 2019 at 10:34 PM Austin Weaver 
>> wrote:
>>
>>> Hey guys, relatively new Spark Dev here and i'm seeing some kafka offset
>>> issues and was wondering if you guys could help me out.
>>>
>>> I am currently running a spark job on Dataproc and am getting errors
>>> trying to re-join a group and read data from a kafka topic. I have done
>>> some digging and am not sure what the issue is. I have auto.offset.reset set
>>> to earliest so it should being reading from the earliest available
>>> non-committed offset and initially my spark logs look like this :
>>>
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-11 to offset 5553330.
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-2 to offset 553.
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-3 to offset 484.
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-4 to offset 586.
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-5 to offset 502.
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-6 to offset 561.
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-7 to offset 542.```
>>>
>>> But then the very next line I get an error trying to read from a
>>> nonexistent offset on the server (you can see that the offset for the
>>> partition differs from the one listed above, so I have no idea why it would
>>> be attempting to read form that offset, here is the error on the next line:
>>>
>>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
>>> out of range with no configured reset policy for partitions:
>>> {demo.topic-11=4544296}
>>>
>>> Any ideas to why my spark job is constantly going back to this offset
>>> (4544296), and not the one it outputs originally (5553330)?
>>>
>>> It seems to be contradicting itself w a) the actual offset it says its
>>> on and the one it 

Best notebook for developing for apache spark using scala on Amazon EMR Cluster

2019-04-30 Thread V0lleyBallJunki3
Hello. I am using Zeppelin on Amazon EMR cluster while developing Apache
Spark programs in Scala. The problem is that once that cluster is destroyed
I lose all the notebooks on it. So over a period of time I have a lot of
notebooks that require to be manually  exported into my local disk and from
there imported to each new EMR cluster I create. Is there a notebook
repository or tool that I can use where I can keep all my notebooks in a
folder and access them even on new emr clusters. I know Jupyter is there but
it doesn't support auto-complete for Scala.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Turning off Jetty Http Options Method

2019-04-30 Thread Ankit Jain
Hi Fellow Spark users,
We are using Spark 2.3.0 and security team is reporting a violation that
Spark allows HTTP OPTIONS method to work(This method exposes what all
methods are supported by the end point which could be exploited by a
hacker).

This method is on Jetty web server, I see Spark uses Jetty for web UI and
some internal communication as well.

For Spark UI, we are planning to write a javaxfiler, create a jar and add
it to spark libs to not respond to options method. We don't have a clean
solution for internal jetty server that is used as a file server though.

It will be nice if Spark itself didn't allow Options method to work,
similar to what was done for TRACE -
https://issues.apache.org/jira/browse/SPARK-5983

What do you guys think? Does community feel this should be something added
directly to spark code?

Also, if there is a later version of Spark where this has been addressed,
please let us know too.

-- 
Thanks & Regards,
Ankit.


Re: Issue with offset management using Spark on Dataproc

2019-04-30 Thread Austin Weaver
@deng - There was a short erroneous period where 2 streams were reading
from the same topic and group id were running at the same time. We saw
errors in this and stopped the extra stream. That being said, I would think
regardless that the auto.offset.reset would kick in sine documentation says
that it will kick in if there is no existing current offset or that the
current offset no longer exists on the kafka topic? Moreover, that doesn't
explain the fact that the spark logs that it is on one offset for that
partition (5553330) - and then immediately errors out trying to read the
old offset (4544296) that no longer exists?

@Akshay - I am using Spark Streaming (D-streams) Here is a snippet of the
kafka consumer configuration I am using (redacted some fields) -

kakaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
kakaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "");
kakaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kakaConsumerProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
RoundRobinAssignor.class.getName());
kakaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
kakaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
kakaConsumerProperties.put("auto.offset.reset", "earliest");
kakaConsumerProperties.put("sasl.mechanism", "PLAIN");
kakaConsumerProperties.put("sasl.jaas.config", "security.protocol");
kakaConsumerProperties.put("security.protocol", "");

and I'm using LocationStrategies.PreferConsistent()

Thanks

On Tue, Apr 30, 2019 at 5:56 AM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi Austin,
>
> Are you using Spark Streaming or Structured Streaming?
>
> For better understanding, could you also provide sample code/config params
> for your spark-kafka connector for the said streaming job?
>
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Mon, Apr 29, 2019 at 10:34 PM Austin Weaver 
> wrote:
>
>> Hey guys, relatively new Spark Dev here and i'm seeing some kafka offset
>> issues and was wondering if you guys could help me out.
>>
>> I am currently running a spark job on Dataproc and am getting errors
>> trying to re-join a group and read data from a kafka topic. I have done
>> some digging and am not sure what the issue is. I have auto.offset.reset set
>> to earliest so it should being reading from the earliest available
>> non-committed offset and initially my spark logs look like this :
>>
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-11 to offset 5553330.
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-2 to offset 553.
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-3 to offset 484.
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-4 to offset 586.
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-5 to offset 502.
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-6 to offset 561.
>> 19/04/29 16:30:30 INFO
>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>> partition demo.topic-7 to offset 542.```
>>
>> But then the very next line I get an error trying to read from a
>> nonexistent offset on the server (you can see that the offset for the
>> partition differs from the one listed above, so I have no idea why it would
>> be attempting to read form that offset, here is the error on the next line:
>>
>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
>> out of range with no configured reset policy for partitions:
>> {demo.topic-11=4544296}
>>
>> Any ideas to why my spark job is constantly going back to this offset
>> (4544296), and not the one it outputs originally (5553330)?
>>
>> It seems to be contradicting itself w a) the actual offset it says its on
>> and the one it attempts to read and b) saying no configured reset policy
>> --
>> Austin Weaver
>> Software Engineer
>> FLYR, Inc.   www.flyrlabs.com
>>
>

-- 
Austin Weaver
Software Engineer
FLYR, Inc.   www.flyrlabs.com


Spark Structured Streaming | Highly reliable de-duplication strategy

2019-04-30 Thread Akshay Bhardwaj
Hi Experts,

I am using spark structured streaming to read message from Kafka, with a
producer that works with at-least once guarantee. This streaming job is
running on Yarn cluster with hadoop 2.7 and spark 2.3

What is the most reliable strategy for avoiding duplicate data within
stream in the scenarios of fail-over or job restarts/re-submits, and
guarantee exactly once non-duplicate stream?


   1. One of the strategies I have read other people using is to maintain
   an external KV store for unique-key/checksum of the incoming message, and
   write to a 2nd kafka topic only if the checksum is not present in KV store.
   - My doubts with this approach is how to ensure safe write to both the
  2nd topic and to KV store for storing checksum, in the case of unwanted
  failures. How does that guarantee exactly-once with restarts?

Any suggestions are highly appreciated.


Akshay Bhardwaj
+91-97111-33849


Re: How to specify number of Partition using newAPIHadoopFile()

2019-04-30 Thread Prateek Rajput
On Tue, Apr 30, 2019 at 6:48 PM Vatsal Patel 
wrote:

> *Issue: *
>
> When I am reading sequence file in spark, I can specify the number of
> partitions as an argument to the API, below is the way
> *public  JavaPairRDD sequenceFile(String path, Class
> keyClass, Class valueClass, int minPartitions)*
>
> *In newAPIHadoopFile(), this support has been removed. below are the APIs.*
>
>- public V>> JavaPairRDD *newAPIHadoopFile*(String path, Class fClass,
>Class kClass, Class vClass, Configuration conf)
>-
>
>public >
>JavaPairRDD *newAPIHadoopRDD*(Configuration conf, Class
>fClass, Class kClass, Class vClass)
>
> Is there a way to specify the number of partitions when I will read *Avro*
> file using *newAPIHadoopFile*(). I explored and found that we can specify
> the hadoop configuration and in that, we can set various Hadoop properties.
> but there we can specify the size using this property 
> ("*mapred.max.split.size","50mb").
> *based on this it will calculate the number of partitions but then each
> partition's size may or may not be equal or less than the specified size.
>
>- *note - *A way other than repartition()
>
> *Execution Environment*
>
>- SPARK-JAVA VERSION - 2.4.0
>- JDK VERSION - 1.8
>- SPARK ARTIFACTID - spark-core_2.11
>- AVRO VERSION -  1.8.2
>
> Please help us understand, why this issue is coming?
>
> Thanks,
> Vatsal
>


Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-04-30 Thread Patrick McCarthy
Hi Rishi,

I've had success using the approach outlined here:
https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html

Does this work for you?

On Tue, Apr 30, 2019 at 12:32 AM Rishi Shah 
wrote:

> modified the subject & would like to clarify that I am looking to create
> an anaconda parcel with pyarrow and other libraries, so that I can
> distribute it on the cloudera cluster..
>
> On Tue, Apr 30, 2019 at 12:21 AM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I have been trying to figure out a way to build anaconda parcel with
>> pyarrow included for my cloudera managed server for distribution but this
>> doesn't seem to work right. Could someone please help?
>>
>> I have tried to install anaconda on one of the management nodes on
>> cloudera cluster... tarred the directory, but this directory doesn't
>> include all the packages to form a proper parcel for distribution.
>>
>> Any help is much appreciated!
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>
>
> --
> Regards,
>
> Rishi Shah
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Fwd: How to specify number of Partition using newAPIHadoopFile()

2019-04-30 Thread Vatsal Patel
*Issue: *

When I am reading sequence file in spark, I can specify the number of
partitions as an argument to the API, below is the way
*public  JavaPairRDD sequenceFile(String path, Class
keyClass, Class valueClass, int minPartitions)*

*In newAPIHadoopFile(), this support has been removed. below are the APIs.*

   - public >
   JavaPairRDD *newAPIHadoopFile*(String path, Class fClass,
   Class kClass, Class vClass, Configuration conf)
   -

   public >
   JavaPairRDD *newAPIHadoopRDD*(Configuration conf, Class fClass,
   Class kClass, Class vClass)

Is there a way to specify the number of partitions when I will read *Avro*
file using *newAPIHadoopFile*(). I explored and found that we can specify
the hadoop configuration and in that, we can set various Hadoop properties.
but there we can specify the size using this property
("*mapred.max.split.size","50mb").
*based on this it will calculate the number of partitions but then each
partition's size may or may not be equal or less than the specified size.

   - *note - *A way other than repartition()

*Execution Environment*

   - SPARK-JAVA VERSION - 2.4.0
   - JDK VERSION - 1.8
   - SPARK ARTIFACTID - spark-core_2.11
   - AVRO VERSION -  1.8.2

Please help us understand, why this issue is coming?

Thanks,
Vatsal


Re: Issue with offset management using Spark on Dataproc

2019-04-30 Thread Akshay Bhardwaj
Hi Austin,

Are you using Spark Streaming or Structured Streaming?

For better understanding, could you also provide sample code/config params
for your spark-kafka connector for the said streaming job?


Akshay Bhardwaj
+91-97111-33849


On Mon, Apr 29, 2019 at 10:34 PM Austin Weaver  wrote:

> Hey guys, relatively new Spark Dev here and i'm seeing some kafka offset
> issues and was wondering if you guys could help me out.
>
> I am currently running a spark job on Dataproc and am getting errors
> trying to re-join a group and read data from a kafka topic. I have done
> some digging and am not sure what the issue is. I have auto.offset.reset set
> to earliest so it should being reading from the earliest available
> non-committed offset and initially my spark logs look like this :
>
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-11 to offset 5553330.
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-2 to offset 553.
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-3 to offset 484.
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-4 to offset 586.
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-5 to offset 502.
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-6 to offset 561.
> 19/04/29 16:30:30 INFO
> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
> clientId=consumer-1, groupId=demo-group] Resetting offset for
> partition demo.topic-7 to offset 542.```
>
> But then the very next line I get an error trying to read from a
> nonexistent offset on the server (you can see that the offset for the
> partition differs from the one listed above, so I have no idea why it would
> be attempting to read form that offset, here is the error on the next line:
>
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
> out of range with no configured reset policy for partitions:
> {demo.topic-11=4544296}
>
> Any ideas to why my spark job is constantly going back to this offset
> (4544296), and not the one it outputs originally (5553330)?
>
> It seems to be contradicting itself w a) the actual offset it says its on
> and the one it attempts to read and b) saying no configured reset policy
> --
> Austin Weaver
> Software Engineer
> FLYR, Inc.   www.flyrlabs.com
>


Re: Handle Null Columns in Spark Structured Streaming Kafka

2019-04-30 Thread SNEHASISH DUTTA
Hi

NA function will replace null with some default value and not all my
columns are of type string, so for some other data types  (long/int etc) I
have to provide some default value

But ideally those values should be null
Actually this null column drop is happening in this step


df.selectExpr( "to_json(struct(*)) AS value")

Is it possible to retain those columns where all the values are null in
this step without using na functions ?

Regards,
Snehasish

On Tue, Apr 30, 2019, 4:58 AM Jason Nerothin 
wrote:

> See also here:
> https://stackoverflow.com/questions/44671597/how-to-replace-null-values-with-a-specific-value-in-dataframe-using-spark-in-jav
>
> On Mon, Apr 29, 2019 at 5:27 PM Jason Nerothin 
> wrote:
>
>> Spark SQL has had an na.fill function on it since at least 2.1. Would
>> that work for you?
>>
>>
>> https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameNaFunctions.html
>>
>> On Mon, Apr 29, 2019 at 4:57 PM Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Hey Snehasish,
>>>
>>> Do you have a reproducer for this issue?
>>>
>>> Best Regards,
>>> Ryan
>>>
>>>
>>> On Wed, Apr 24, 2019 at 7:24 AM SNEHASISH DUTTA <
>>> info.snehas...@gmail.com> wrote:
>>>
 Hi,

 While writing to kafka using spark structured streaming , if all the
 values in certain column are Null it gets dropped
 Is there any way to override this , other than using na.fill functions

 Regards,
 Snehasish

>>>
>>
>> --
>> Thanks,
>> Jason
>>
>
>
> --
> Thanks,
> Jason
>


Re: unsubscribe

2019-04-30 Thread Arne Zachlod

please read this to unsubscribe: https://spark.apache.org/community.html

TL;DR: user-unsubscr...@spark.apache.org so no mail to the list

On 4/30/19 6:38 AM, Amrit Jangid wrote:




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Koalas show data in IDE or pyspark

2019-04-30 Thread Manu Zhang
Hi,

It seems koalas.DataFrame can't be displayed in terminal yet as in
https://github.com/databricks/koalas/issues/150 and the work around is
to convert it to pandas DataFrame.

Thanks,
Manu Zhang

On Tue, Apr 30, 2019 at 2:46 PM Achilleus 003 
wrote:

> Hello Everyone,
>
> I have been trying to run *koalas* on both pyspark and pyCharm IDE.
>
> When I run
>
> df = koalas.DataFrame({‘x’: [1, 2], ‘y’: [3, 4], ‘z’: [5, 6]})
> df.head(5)
>
> I don't get the data back instead, I get an object.
> 
>
> I thought df.head can be used to achieve this.
>
> Can anyone guide me on how we can print something on the terminal?
> Something similar to df.show() in spark.
>
>


Koalas show data in IDE or pyspark

2019-04-30 Thread Achilleus 003
Hello Everyone,

I have been trying to run *koalas* on both pyspark and pyCharm IDE.

When I run

df = koalas.DataFrame({‘x’: [1, 2], ‘y’: [3, 4], ‘z’: [5, 6]})
df.head(5)

I don't get the data back instead, I get an object.


I thought df.head can be used to achieve this.

Can anyone guide me on how we can print something on the terminal?
Something similar to df.show() in spark.