Re: [Structured Streaming] NullPointerException in long running query

2020-04-29 Thread Shixiong(Ryan) Zhu
The stack trace is omitted by JVM when an exception is thrown too
many times. This usually happens when you have multiple Spark tasks on the
same executor JVM throwing the same exception. See
https://stackoverflow.com/a/3010106

Best Regards,
Ryan


On Tue, Apr 28, 2020 at 10:45 PM lec ssmi  wrote:

> It should be a problem of my data quality. It's curious why the
> driver-side exception stack has no specific exception information.
>
> Edgardo Szrajber  于2020年4月28日周二 下午3:32写道:
>
>> The exception occured while aborting the stage. It might be interesting
>> to try to understand the reason for the abortion.
>> Maybe timeout? How long the query run?
>> Bentzi
>>
>> Sent from Yahoo Mail on Android
>> 
>>
>> On Tue, Apr 28, 2020 at 9:25, Jungtaek Lim
>>  wrote:
>> The root cause of exception is occurred in executor side "Lost task 10.3
>> in stage 1.0 (TID 81, spark6, executor 1)" so you may need to check there.
>>
>> On Tue, Apr 28, 2020 at 2:52 PM lec ssmi  wrote:
>>
>> Hi:
>>   One of my long-running queries occasionally encountered the following
>> exception:
>>
>>
>>   Caused by: org.apache.spark.SparkException: Job aborted due to stage
>> failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost
>> task 10.3 in stage 1.0 (TID 81, spark6, executor 1):
>> java.lang.NullPointerException
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>> at scala.Option.foreach(Option.scala:257)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927)
>> at
>> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
>> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>> at
>> 

Re: Spark 2.3 and Kafka client library version

2020-04-28 Thread Shixiong(Ryan) Zhu
You should be able to override the Kafka client version. The Kafka APIs
used by Structured Streaming exist in new Kafka versions. There is a known
correctness issue  in
Kafka 0.10.1.*. Other versions should be fine.

Best Regards,
Ryan


On Tue, Apr 28, 2020 at 1:54 PM Ahn, Daniel 
wrote:

> I have a keberized HDFS cluster. When I use structured streaming with
> Kafka (with SASL_SSL/PLAINTEXT), I believe I’m blocked by Kafka-5294.
>
>
>
> It seems like fix version in 0.11.0.0 Kafka client library. I have a Spark
> 2.3 cluster, and it’s using 0.10.0.1 kafka client library. Do you know if I
> can override kafka client version during runtime?
>
>
>
> - Dan
>
>
>
>
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>


Re: Problems during upgrade 2.2.2 -> 2.4.4

2020-01-31 Thread Shixiong(Ryan) Zhu
The reason of this is Spark RPC and the persisted states of HA mode are
both using Java serialization to serialize internal classes which don't
have any compatibility guarantee.

Best Regards,
Ryan


On Fri, Jan 31, 2020 at 9:08 AM Shixiong(Ryan) Zhu 
wrote:

> Unfortunately, Spark standalone mode doesn't support rolling update. All
> Spark components (master, worker, driver) must be updated to the same
> version. When using HA mode, the states persisted in zookeeper (or files if
> not using zookeeper) need to be cleaned because they are not compatible
> between versions.
>
> Best Regards,
> Ryan
>
>
> On Wed, Jan 29, 2020 at 2:12 AM bsikander  wrote:
>
>> Anyone?
>> This question is not regarding my application running on top of Spark.
>> The question is about the upgrade of spark itself from 2.2 to 2.4.
>>
>> I expected atleast that spark would recover from upgrades gracefully and
>> recover its own persisted objects.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
>
> Best Regards,
> Ryan
>


Re: Problems during upgrade 2.2.2 -> 2.4.4

2020-01-31 Thread Shixiong(Ryan) Zhu
Unfortunately, Spark standalone mode doesn't support rolling update. All
Spark components (master, worker, driver) must be updated to the same
version. When using HA mode, the states persisted in zookeeper (or files if
not using zookeeper) need to be cleaned because they are not compatible
between versions.

Best Regards,
Ryan


On Wed, Jan 29, 2020 at 2:12 AM bsikander  wrote:

> Anyone?
> This question is not regarding my application running on top of Spark.
> The question is about the upgrade of spark itself from 2.2 to 2.4.
>
> I expected atleast that spark would recover from upgrades gracefully and
> recover its own persisted objects.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --

Best Regards,
Ryan


Re: Issue with offset management using Spark on Dataproc

2019-04-30 Thread Shixiong(Ryan) Zhu
I recommend you to use Structured Streaming as it has a patch that can
workaround this issue: https://issues.apache.org/jira/browse/SPARK-26267

Best Regards,
Ryan


On Tue, Apr 30, 2019 at 3:34 PM Shixiong(Ryan) Zhu 
wrote:

> There is a known issue that Kafka may return a wrong offset even if there
> is no reset happening: https://issues.apache.org/jira/browse/KAFKA-7703
>
> Best Regards,
> Ryan
>
>
> On Tue, Apr 30, 2019 at 10:41 AM Austin Weaver 
> wrote:
>
>> @deng - There was a short erroneous period where 2 streams were reading
>> from the same topic and group id were running at the same time. We saw
>> errors in this and stopped the extra stream. That being said, I would think
>> regardless that the auto.offset.reset would kick in sine documentation says
>> that it will kick in if there is no existing current offset or that the
>> current offset no longer exists on the kafka topic? Moreover, that doesn't
>> explain the fact that the spark logs that it is on one offset for that
>> partition (5553330) - and then immediately errors out trying to read the
>> old offset (4544296) that no longer exists?
>>
>> @Akshay - I am using Spark Streaming (D-streams) Here is a snippet of the
>> kafka consumer configuration I am using (redacted some fields) -
>>
>> kakaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
>> kakaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "");
>> kakaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
>> kakaConsumerProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
>>  RoundRobinAssignor.class.getName());
>> kakaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
>> StringDeserializer.class);
>> kakaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
>> StringDeserializer.class);
>> kakaConsumerProperties.put("auto.offset.reset", "earliest");
>> kakaConsumerProperties.put("sasl.mechanism", "PLAIN");
>> kakaConsumerProperties.put("sasl.jaas.config", "security.protocol");
>> kakaConsumerProperties.put("security.protocol", "");
>>
>> and I'm using LocationStrategies.PreferConsistent()
>>
>> Thanks
>>
>> On Tue, Apr 30, 2019 at 5:56 AM Akshay Bhardwaj <
>> akshay.bhardwaj1...@gmail.com> wrote:
>>
>>> Hi Austin,
>>>
>>> Are you using Spark Streaming or Structured Streaming?
>>>
>>> For better understanding, could you also provide sample code/config
>>> params for your spark-kafka connector for the said streaming job?
>>>
>>>
>>> Akshay Bhardwaj
>>> +91-97111-33849
>>>
>>>
>>> On Mon, Apr 29, 2019 at 10:34 PM Austin Weaver 
>>> wrote:
>>>
>>>> Hey guys, relatively new Spark Dev here and i'm seeing some kafka
>>>> offset issues and was wondering if you guys could help me out.
>>>>
>>>> I am currently running a spark job on Dataproc and am getting errors
>>>> trying to re-join a group and read data from a kafka topic. I have done
>>>> some digging and am not sure what the issue is. I have
>>>> auto.offset.reset set to earliest so it should being reading from the
>>>> earliest available non-committed offset and initially my spark logs look
>>>> like this :
>>>>
>>>> 19/04/29 16:30:30 INFO
>>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>>> partition demo.topic-11 to offset 5553330.
>>>> 19/04/29 16:30:30 INFO
>>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>>> partition demo.topic-2 to offset 553.
>>>> 19/04/29 16:30:30 INFO
>>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>>> partition demo.topic-3 to offset 484.
>>>> 19/04/29 16:30:30 INFO
>>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>>> partition demo.topic-4 to offset 586.
>>>> 19/04/29 16:30:30 INFO
>>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>>> partition d

Re: Issue with offset management using Spark on Dataproc

2019-04-30 Thread Shixiong(Ryan) Zhu
There is a known issue that Kafka may return a wrong offset even if there
is no reset happening: https://issues.apache.org/jira/browse/KAFKA-7703

Best Regards,
Ryan


On Tue, Apr 30, 2019 at 10:41 AM Austin Weaver  wrote:

> @deng - There was a short erroneous period where 2 streams were reading
> from the same topic and group id were running at the same time. We saw
> errors in this and stopped the extra stream. That being said, I would think
> regardless that the auto.offset.reset would kick in sine documentation says
> that it will kick in if there is no existing current offset or that the
> current offset no longer exists on the kafka topic? Moreover, that doesn't
> explain the fact that the spark logs that it is on one offset for that
> partition (5553330) - and then immediately errors out trying to read the
> old offset (4544296) that no longer exists?
>
> @Akshay - I am using Spark Streaming (D-streams) Here is a snippet of the
> kafka consumer configuration I am using (redacted some fields) -
>
> kakaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
> kakaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "");
> kakaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
> kakaConsumerProperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
>  RoundRobinAssignor.class.getName());
> kakaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class);
> kakaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class);
> kakaConsumerProperties.put("auto.offset.reset", "earliest");
> kakaConsumerProperties.put("sasl.mechanism", "PLAIN");
> kakaConsumerProperties.put("sasl.jaas.config", "security.protocol");
> kakaConsumerProperties.put("security.protocol", "");
>
> and I'm using LocationStrategies.PreferConsistent()
>
> Thanks
>
> On Tue, Apr 30, 2019 at 5:56 AM Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
>> Hi Austin,
>>
>> Are you using Spark Streaming or Structured Streaming?
>>
>> For better understanding, could you also provide sample code/config
>> params for your spark-kafka connector for the said streaming job?
>>
>>
>> Akshay Bhardwaj
>> +91-97111-33849
>>
>>
>> On Mon, Apr 29, 2019 at 10:34 PM Austin Weaver 
>> wrote:
>>
>>> Hey guys, relatively new Spark Dev here and i'm seeing some kafka offset
>>> issues and was wondering if you guys could help me out.
>>>
>>> I am currently running a spark job on Dataproc and am getting errors
>>> trying to re-join a group and read data from a kafka topic. I have done
>>> some digging and am not sure what the issue is. I have auto.offset.reset set
>>> to earliest so it should being reading from the earliest available
>>> non-committed offset and initially my spark logs look like this :
>>>
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-11 to offset 5553330.
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-2 to offset 553.
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-3 to offset 484.
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-4 to offset 586.
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-5 to offset 502.
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-6 to offset 561.
>>> 19/04/29 16:30:30 INFO
>>> org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
>>> clientId=consumer-1, groupId=demo-group] Resetting offset for
>>> partition demo.topic-7 to offset 542.```
>>>
>>> But then the very next line I get an error trying to read from a
>>> nonexistent offset on the server (you can see that the offset for the
>>> partition differs from the one listed above, so I have no idea why it would
>>> be attempting to read form that offset, here is the error on the next line:
>>>
>>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
>>> out of range with no configured reset policy for partitions:
>>> {demo.topic-11=4544296}
>>>
>>> Any ideas to why my spark job is constantly going back to this offset
>>> (4544296), and not the one it outputs originally (5553330)?
>>>
>>> It seems to be contradicting itself w a) the actual offset it says its
>>> on and the one it 

Re: Spark streaming error - Query terminated with exception: assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,… 26 more fields != b#1291L

2019-04-01 Thread Shixiong(Ryan) Zhu
Could you try to use $”a” rather than df(“a”)? The latter one sometimes
doesn’t work.

On Thu, Mar 21, 2019 at 10:41 AM kineret M  wrote:

> I try to read a stream using my custom data source (v2, using spark 2.3),
> and it fails *in the second iteration* with the following exception while
> reading prune columns:Query [id=xxx, runId=yyy] terminated with
> exception: assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,...
> 26 more fields != b#1291L
>
> Datafream creation:
>
> val df = sparkSession.readStream.format("myV2Source").load("/")
> val df1 = df.filter(df("a") >= "-1").select("b")
>
> Stream execution:
>
>   val streamingQuery = df1
> .writeStream
> .format("console")
> .trigger(Trigger.ProcessingTime("10 seconds"))
> .outputMode(OutputMode.Append())
> .start()
>
> streamingQuery.awaitTermination()
>
> if I remove the select (i.e. val df1 = df.filter(df("a") >= "-1")), it
> works fine.
>
> Any idea why?
>
-- 

Best Regards,
Ryan


Re: Spark Kafka Batch Write guarantees

2019-04-01 Thread Shixiong(Ryan) Zhu
The Kafka source doesn’t support transaction. You may see partial data or
duplicated data if a Spark task fails.

On Wed, Mar 27, 2019 at 1:15 AM hemant singh  wrote:

> We are using spark batch to write Dataframe to Kafka topic. The spark
> write function with write.format(source = Kafka).
> Does spark provide similar guarantee like it provides with saving
> dataframe to disk; that partial data is not written to Kafka i.e. full
> dataframe is saved or if job fails no data is written to Kafka topic.
>
> Thanks.
>
-- 

Best Regards,
Ryan


Re: Structured streaming from Kafka by timestamp

2019-01-24 Thread Shixiong(Ryan) Zhu
Hey Tomas,

>From your description, you just ran a batch query rather than a Structured
Streaming query. The Kafka data source doesn't support filter push down
right now. But that's definitely doable. One workaround here is setting
proper  "startingOffsets" and "endingOffsets" options when loading from
Kafka.

Best Regards,
Ryan


On Thu, Jan 24, 2019 at 10:15 AM Gabor Somogyi 
wrote:

> Hi Tomas,
>
> As a general note don't fully understand your use-case. You've mentioned
> structured streaming but your query is more like a one-time SQL statement.
> Kafka doesn't support predicates how it's integrated with spark. What can
> be done from spark perspective is to look for an offset for a specific
> lowest timestamp and start the reading from there.
>
> BR,
> G
>
>
> On Thu, Jan 24, 2019 at 6:38 PM Tomas Bartalos 
> wrote:
>
>> Hello,
>>
>> I'm trying to read Kafka via spark structured streaming. I'm trying to
>> read data within specific time range:
>>
>> select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00'
>> as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP);
>>
>>
>> The problem is that timestamp query is not pushed-down to Kafka, so Spark
>> tries to read the whole topic from beginning.
>>
>>
>> explain query:
>>
>> 
>>
>>  +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 >
>> 15351480)) && (timestamp#57 < 15352344))
>>
>>
>> Scan
>> KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production],
>> start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit)
>> [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58]
>> *PushedFilters: []*, ReadSchema:
>> struct>
>>
>> Obviously the query takes forever to complete. Is there a solution to
>> this ?
>>
>> I'm using kafka and kafka-client version 1.1.1
>>
>>
>> BR,
>>
>> Tomas
>>
>


Re: Spark Streaming join taking long to process

2018-11-27 Thread Shixiong(Ryan) Zhu
If you are using the same code to run on Yarn, I believe it’s still using
the local mode as it overwrites the master url set by CLI. You can check
the “executors” tab in the Spark UI to set how many executors are running,
and verify if it matches your config.
On Tue, Nov 27, 2018 at 6:17 AM Abhijeet Kumar 
wrote:

> Yes, it did
> Thanks for the solution. I solved it locally, but I’m worried how I can do
> this when I’m using yarn because that same 15 Sec is taking on the yarn too
> :)
>
> On 27-Nov-2018, at 4:42 PM, Srikanth Sriram 
> wrote:
>
> Hello Abhijeet,
> I am not sure my answer will solve your problem but just think of the
> master configuration set for spark application.
>
> val spark = SparkSession.builder
>   .appName("Argoid_Realtime_Pipeline")
>   .master("local")
>   .getOrCreate()
>
> I see you have set it has "local" not as "local[*]".
>
> From other blog, i got this information, sharing you in full sentence:
> "We are going to work locally, running the application straight from our
> IDE, so we are setting the master to local[*], meaning we are creating as
> many threads as there are cores on the machine."
>
> Just check if this is reducing the time taken for processing, since by
> this local[*] we are going to use all cores available, not just one core?
>
> Regards,
> Sriram Srikanth
>
> On Tue, Nov 27, 2018 at 1:46 PM Abhijeet Kumar <
> abhijeet.ku...@sentienz.com> wrote:
>
>> Hi All,
>>
>> I'm just practicing Spark Streaming with joining two different stream. I
>> noticed that it's taking around 15 seconds for each record. Let me share
>> the details and the code:
>>
>> If you can see for stage id 2, it's taking 15 s. Isn't this strange.
>>
>> Code:
>>
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.streaming.Trigger
>> import org.apache.spark.sql.types.TimestampType
>> import org.apache.log4j.{Level, Logger}
>>
>> object StreamJoin{
>>
>>   val kafkaTopic1 = "demo2"
>>   val kafkaTopic2 = "demo3"
>>   val bootstrapServer = "localhost:9092"
>>
>>   def main(args: Array[String]): Unit = {
>> val checkPointDir = "hdfs://localhost:8020/checkpo"
>>
>> val spark = SparkSession.builder
>>   .appName("Argoid_Realtime_Pipeline")
>>   .master("local")
>>   .getOrCreate()
>>
>> val rootLogger = Logger.getRootLogger()
>> rootLogger.setLevel(Level.ERROR)
>>
>> import spark.implicits._
>>
>> val df1 = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", bootstrapServer)
>>   .option("subscribe", kafkaTopic1)
>>   .option("failOnDataLoss", "false")
>>   .load()
>>
>> val df2 = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", bootstrapServer)
>>   .option("subscribe", kafkaTopic2)
>>   .option("failOnDataLoss", "false")
>>   .load()
>>
>> val order_details = df1
>>   .withColumn("s_order_id", get_json_object($"value".cast("String"), 
>> "$.order_id"))
>>   .withColumn("s_customer_id", get_json_object($"value".cast("String"), 
>> "$.customer_id"))
>>   .withColumn("s_promotion_id", get_json_object($"value".cast("String"), 
>> "$.promotion_id"))
>>   .withColumn("s_store_id", get_json_object($"value".cast("String"), 
>> "$.store_id"))
>>   .withColumn("s_product_id", get_json_object($"value".cast("String"), 
>> "$.product_id"))
>>   .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), 
>> "$.warehouse_id"))
>>   .withColumn("unit_cost", get_json_object($"value".cast("String"), 
>> "$.unit_cost"))
>>   .withColumn("total_cost", get_json_object($"value".cast("String"), 
>> "$.total_cost"))
>>   .withColumn("units_sold", get_json_object($"value".cast("String"), 
>> "$.units_sold"))
>>   .withColumn("promotion_cost", get_json_object($"value".cast("String"), 
>> "$.promotion_cost"))
>>   .withColumn("date_of_order", get_json_object($"value".cast("String"), 
>> "$.date_of_order"))
>>   .withColumn("tstamp_trans", current_timestamp())
>>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", 
>> "MMddHHmmss").cast(TimestampType))
>>   .select($"s_customer_id", $"s_order_id", $"s_promotion_id", 
>> $"s_store_id", $"s_product_id",
>> $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
>> $"total_cost".cast("integer") as "total_cost", 
>> $"promotion_cost".cast("integer") as "promotion_cost",
>> $"date_of_order", $"tstamp_trans", $"TIMESTAMP", 
>> $"units_sold".cast("integer") as "units_sold")
>>
>> val invoice_details = df2
>>   .withColumn("order_id", get_json_object($"value".cast("String"), 
>> "$.order_id"))
>>   .withColumn("invoice_status", get_json_object($"value".cast("String"), 
>> "$.invoice_status"))
>>   .where($"invoice_status" === "Success")
>>
>>   .withColumn("tstamp_trans", current_timestamp())
>>   

Re: Recreate Dataset from list of Row in spark streaming application.

2018-10-05 Thread Shixiong(Ryan) Zhu
oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor is
a ForeachWriter. Right? You can not use SparkSession in its process method
as it will run in executors.

Best Regards,
Ryan


On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin  wrote:

> Hello,
>
> I have a spark streaming application which reads from Kafka based on the
> given schema.
>
> Dataset  m_oKafkaEvents =
> getSparkSession().readStream().format("kafka")
> .option("kafka.bootstrap.servers", strKafkaAddress)
> .option("assign", strSubscription)
> .option("maxOffsetsPerTrigger", "10")
> .option("startingOffsets", "latest")
> .option("failOnDataLoss", false)
> .load()
> .filter(strFilter)
>
> .select(functions.from_json(functions.col("value").cast("string"),
> schema).alias("events"))
> .select("events.*");
>
>
> Now this dataset is grouped by one of the column(InstanceId) which is the
> key for us and then fed into flatMapGroupsWithState function. This function
> does some correlation.
>
> Dataset sessionUpdates = m_oKafkaEvents.groupByKey(
>   new MapFunction() {
> @Override public String call(Row event) {
> return event.getAs("InstanceId");
> }
>   }, Encoders.STRING())
>   .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
>   Encoders.bean(InsightEventInfo.class),
> Encoders.bean(InsightEventUpdate.class),
>   GroupStateTimeout.ProcessingTimeTimeout());
>
>
> The output dataset is of type InsightEventUpdate which contains List of
> Spark Rows which is related to the InstanceId.
>
> Now I want to convert this back into of type Dataset. Basically I
> have List of rows.
>
> I tried
>
> sparkSession.createDataFrame(listOfRows, schema);
>
> this gives me
>
> ava.lang.NullPointerException
> at
> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
> at
> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
> at
> org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
> at
> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)
>
> Can someone help me what is the way to go ahead?
>
> thanks
> Robin Kuttaiah
>
>
>
>
>


Re: PySpark structured streaming job throws socket exception

2018-10-04 Thread Shixiong(Ryan) Zhu
As far as I know, the error log in updateAccumulators will not fail a Spark
task. Did you see other error messages?

Best Regards,
Ryan


On Thu, Oct 4, 2018 at 2:14 PM mmuru  wrote:

> Hi,
>
> Running Pyspark structured streaming job on K8S with 2 executor pods. The
> driver pod failed with the following up exception. It fails consistently
> after 3 to 6hrs of running.
>
> Any idea how to fix this exception. I really appreciate your help.
>
>
> 2018-10-04 18:48:27 ERROR DAGScheduler:91 - Failed to update accumulators
> for task 21
> java.net.SocketException: Connection reset
> at java.net.SocketInputStream.read(SocketInputStream.java:210)
> at java.net.SocketInputStream.read(SocketInputStream.java:141)
> at java.net.SocketInputStream.read(SocketInputStream.java:224)
> at
> org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:659)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1257)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1249)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
>
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1249)
> at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1331)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2100)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2052)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2041)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> 2018-10-04 18:48:27 ERROR DAGScheduler:91 - Failed to update accumulators
> for task 22
> java.net.SocketException: Broken pipe (Write failed)
> at java.net.SocketOutputStream.socketWrite0(Native Method)
> at
> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> at
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at
> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> at java.io.DataOutputStream.flush(DataOutputStream.java:123)
> at
> org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:657)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1257)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1249)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
>
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1249)
> at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1331)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2100)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2052)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2041)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Kafka Connector version support

2018-09-21 Thread Shixiong(Ryan) Zhu
-dev
+user

We don't backport new features to a maintenance branch. All new updates
will be just in 2.4.

Best Regards,
Ryan

On Fri, Sep 21, 2018 at 2:44 PM, Basil Hariri <
basil.har...@microsoft.com.invalid> wrote:

> Hi all,
>
>
>
> Are there any plans to backport the recent (2.4) updates to the
> Spark-Kafka adapter for use with Spark v2.3, or will the updates just be
> for v2.4+?
>
>
>
> Thanks,
>
> Basil
>


Re: Error while doing stream-stream inner join (java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access)

2018-07-03 Thread Shixiong(Ryan) Zhu
Which version are you using? There is a known issue regarding this and
should be fixed in 2.3.1. See
https://issues.apache.org/jira/browse/SPARK-23623 for details.

Best Regards,
Ryan

On Mon, Jul 2, 2018 at 3:56 AM, kant kodali  wrote:

> Hi All,
>
> I get the below error quite often when I do an stream-stream inner join on
> two data frames. After running through several experiments stream-stream
> joins dont look stable enough for production yet. any advice on this?
>
> Thanks!
>
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
> org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1361)
> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.close(
> CachedKafkaConsumer.scala:301)
>


Re: [structured-streaming][kafka] Will the Kafka readstream timeout after connections.max.idle.ms 540000 ms ?

2018-05-16 Thread Shixiong(Ryan) Zhu
The streaming query should keep polling data from Kafka. When the query was
stopped, did you see any exception?

Best Regards,
Shixiong Zhu
Databricks Inc.
shixi...@databricks.com

databricks.com

[image: http://databricks.com] 




On Tue, May 15, 2018 at 6:25 PM, karthikjay  wrote:

> Hi all,
>
> We are running into a scenario where the structured streaming job is
> exiting
> after a while specifically when the Kafka topic is not getting any data.
> From the job logs, I see this connections.max.idle.ms = 54. Does that
> mean the spark readstream will close when it does not get data for 54
> milliseconds ? If yes, how do I override this ?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Continuous Processing mode behaves differently from Batch mode

2018-05-16 Thread Shixiong(Ryan) Zhu
One possible case is you don't have enough resources to launch all tasks
for your continuous processing query. Could you check the Spark UI and see
if all tasks are running rather than waiting for resources?

Best Regards,
Shixiong Zhu
Databricks Inc.
shixi...@databricks.com

databricks.com

[image: http://databricks.com] 




On Tue, May 15, 2018 at 5:38 PM, Yuta Morisawa  wrote:

> Hi all
>
> Now I am using Structured Streaming in Continuous Processing mode and I
> faced a odd problem.
>
> My code is so simple that it is similar to the sample code on the
> documentation.
> https://spark.apache.org/docs/latest/structured-streaming-pr
> ogramming-guide.html#continuous-processing
>
>
> When I send the same text data ten times, for example 10 lines text, in
> Batch mode the result has 100 lines.
>
> But in Continuous Processing mode the result has only 10 lines.
> It appears duplicated lines are removed.
>
> The difference of these two codes is only with or without trigger method.
>
> Why these two code behave differently ?
>
>
> --
> Regard,
> Yuta
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: I cannot use spark 2.3.0 and kafka 0.9?

2018-05-08 Thread Shixiong(Ryan) Zhu
"note that the 0.8 integration is compatible with later 0.9 and 0.10
brokers, but the 0.10 integration is not compatible with earlier brokers."

This is pretty clear. You can use 0.8 integration to talk to 0.9 broker.

Best Regards,
Shixiong Zhu
Databricks Inc.
shixi...@databricks.com

databricks.com

[image: http://databricks.com] 




On Fri, May 4, 2018 at 2:02 AM, kant kodali  wrote:

> Hi All,
>
> This link seems to suggest I cant use Spark 2.3.0 and Kafka 0.9 broker. is
> that correct?
>
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> Thanks!
>


Re: Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits

2018-01-04 Thread Shixiong(Ryan) Zhu
The root cause is probably that HDFSMetadataLog ignores exceptions thrown
by "output.close". I think this should be fixed by this line in Spark 2.2.1
and 3.0.0:
https://github.com/apache/spark/commit/6edfff055caea81dc3a98a6b4081313a0c0b0729#diff-aaeb546880508bb771df502318c40a99L126

Could you try 2.2.1?

On Thu, Jan 4, 2018 at 9:08 AM, William Briggs  wrote:

> I am running a Structured Streaming job (Spark 2.2.0) using EMR 5.9. The
> job sources data from a Kafka topic, performs a variety of filters and
> transformations, and sinks data back into a different Kafka topic.
>
> Once per day, we stop the query in order to merge the namenode edit logs
> with the fsimage, because Structured Streaming creates and destroys a
> significant number of HDFS files, and EMR doesn't support a secondary or HA
> namenode for fsimage compaction (AWS support directed us to do this, as
> Namenode edit logs were filling the disk).
>
> Occasionally, the Structured Streaming query will not restart because the
> most recent file in the "commits" or "offsets" checkpoint subdirectory is
> empty. This seems like an undesirable behavior, as it requires manual
> intervention to remove the empty files in order to force the job to fall
> back onto the last good values. Has anyone run into this behavior? The only
> similar issue I can find is SPARK-21760
> , which appears to
> have no fix or workaround.
>
> Any assistance would be greatly appreciated!
>
> Regards,
> Will
>


Re: Standalone Cluster: ClassNotFound org.apache.kafka.common.serialization.ByteArrayDeserializer

2017-12-28 Thread Shixiong(Ryan) Zhu
The cluster mode doesn't upload jars to the driver node. This is a known
issue: https://issues.apache.org/jira/browse/SPARK-4160

On Wed, Dec 27, 2017 at 1:27 AM, Geoff Von Allmen 
wrote:

> I’ve tried it both ways.
>
> Uber jar gives me gives me the following:
>
>- Caused by: java.lang.ClassNotFoundException: Failed to find data
>source: kafka. Please find packages at http://spark.apache.org/third-
>party-projects.html
>
> If I only do minimal packaging and add org.apache.spark_spark-sql-
> kafka-0-10_2.11-2.2.0.jar as a --package and then add it to the
> --driver-class-path then I get past that error, but I get the error I
> showed in the original post.
>
> I agree it seems it’s missing the kafka-clients jar file as that is where
> the ByteArrayDeserializer is, though it looks like it’s present as far as
> I can tell.
>
> I can see the following two packages in the ClassPath entries on the
> history server (Though the source shows: **(redacted) — not sure
> why?)
>
>- spark://:/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar
>- spark://:/jars/org.apache.spark_spark-sql-kafka-
>0-10_2.11-2.2.0.jar
>
> As as side note, i’m running both a master and worker on the same system
> just to test out running in cluster mode. Not sure if that would have
> anything to do with it. I would think it would make it easier since it's
> got access to all the same file system... but I'm pretty new to Spark.
>
> I have also read through and followed those instructions as well as many
> others at this point.
>
> Thanks!
> ​
>
> On Wed, Dec 27, 2017 at 12:56 AM, Eyal Zituny 
> wrote:
>
>> Hi,
>> it seems that you're missing the kafka-clients jar (and probably some
>> other dependencies as well)
>> how did you packaged you application jar? does it includes all the
>> required dependencies (as an uber jar)?
>> if it's not an uber jar you need to pass via the driver-class-path and
>> the executor-class-path all the files\dirs where your dependencies can be
>> found (note that those must be accessible from each node in the cluster)
>> i suggest to go over the manual
>> 
>>
>> Eyal
>>
>>
>> On Wed, Dec 27, 2017 at 1:08 AM, Geoff Von Allmen > > wrote:
>>
>>> I am trying to deploy a standalone cluster but running into
>>> ClassNotFound errors.
>>>
>>> I have tried a whole myriad of different approaches varying from
>>> packaging all dependencies into a single JAR and using the --packages
>>> and --driver-class-path options.
>>>
>>> I’ve got a master node started, a slave node running on the same system,
>>> and am using spark submit to get the streaming job kicked off.
>>>
>>> Here is the error I’m getting:
>>>
>>> Exception in thread "main" java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at 
>>> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
>>> at 
>>> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
>>> Caused by: java.lang.NoClassDefFoundError: 
>>> org/apache/kafka/common/serialization/ByteArrayDeserializer
>>> at 
>>> org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:376)
>>> at 
>>> org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)
>>> at 
>>> org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:323)
>>> at 
>>> org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:60)
>>> at 
>>> org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
>>> at 
>>> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:88)
>>> at 
>>> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:88)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
>>> at 
>>> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
>>> at com.Customer.start(Customer.scala:47)
>>> at com.Main$.main(Main.scala:23)
>>> at com.Main.main(Main.scala)
>>> ... 6 more
>>> Caused by: java.lang.ClassNotFoundException: 
>>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> ... 18 more

Re: PySpark 2.2.0, Kafka 0.10 DataFrames

2017-11-20 Thread Shixiong(Ryan) Zhu
You are using Spark Streaming Kafka package. The correct package name is "
org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0"

On Mon, Nov 20, 2017 at 4:15 PM, salemi  wrote:

> Yes, we are using --packages
>
> $SPARK_HOME/bin/spark-submit --packages
> org.apache.spark:spark-streaming-kafka-0-10_2.11:2.2.0 --py-files shell.py
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to print plan of Structured Streaming DataFrame

2017-11-20 Thread Shixiong(Ryan) Zhu
-dev +user

Which Spark version are you using? There is a bug in the old Spark. Try to
use the latest version.

In addition, you can call `query.explain()` as well.

On Mon, Nov 20, 2017 at 4:00 AM, Chang Chen  wrote:

> Hi Guys
>
> I modified StructuredNetworkWordCount to see what the executed plan is,
> here are my codes:
>
> val wordCounts = words.groupBy("value").count()
>
> // Start running the query that prints the running counts to the console
> val query = wordCounts.writeStream
>   .outputMode("complete")
>   .format("console")
>   .start()
>
> wordCounts.explain()  // additional codes
>
>
> But it failed with “AnalysisException: Queries with streaming sources must
> be executed with writeStream.start()”?
>
>
> Thanks
> Chang
>
>


Re: spark-stream memory table global?

2017-11-10 Thread Shixiong(Ryan) Zhu
It must be accessed under the same SparkSession. We can also add an option
to make it be a global temp view. Feel free to open a PR to improve it.

On Fri, Nov 10, 2017 at 4:56 AM, Imran Rajjad  wrote:

> Hi,
>
> Does the memory table in which spark-structured streaming results are
> sinked into, is available to other spark applications on the cluster? Is it
> by default global or will only be available to context where streaming is
> being done
>
> thanks
> Imran
>
> --
> I.R
>


Re: Structured Stream in Spark

2017-10-27 Thread Shixiong(Ryan) Zhu
The codes in the link write the data into files. Did you check the output
location?

By the way, if you want to see the data on the console, you can use the
console sink by changing this line *format("parquet").option("path",
outputPath + "/ETL").partitionBy("creationTime").start()* to
*format("console").start().*

On Fri, Oct 27, 2017 at 8:41 AM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi TathagataDas,
>
> I was trying to use eventhub with spark streaming. Looks like I was able
> to make connection successfully but cannot see any data on the console. Not
> sure if eventhub is supported or not.
>
> https://github.com/Azure/spark-eventhubs/blob/master/
> examples/src/main/scala/com/microsoft/spark/sql/examples/
> EventHubsStructuredStreamingExample.scala
> is the code snippet I have used to connect to eventhub
>
> Thanks,
> Asmath
>
>
>
> On Thu, Oct 26, 2017 at 9:39 AM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Thanks TD.
>>
>> On Wed, Oct 25, 2017 at 6:42 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Please do not confuse old Spark Streaming (DStreams) with Structured
>>> Streaming. Structured Streaming's offset and checkpoint management is far
>>> more robust than DStreams.
>>> Take a look at my talk - https://spark-summit.org/201
>>> 7/speakers/tathagata-das/
>>>
>>> On Wed, Oct 25, 2017 at 9:29 PM, KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>>
 Thanks Subhash.

 Have you ever used zero data loss concept with streaming. I am bit
 worried to use streamig when it comes to data loss.

 https://blog.cloudera.com/blog/2017/06/offset-management-for
 -apache-kafka-with-apache-spark-streaming/


 does structured streaming handles it internally?

 On Wed, Oct 25, 2017 at 3:10 PM, Subhash Sriram <
 subhash.sri...@gmail.com> wrote:

> No problem! Take a look at this:
>
> http://spark.apache.org/docs/latest/structured-streaming-pro
> gramming-guide.html#recovering-from-failures-with-checkpointing
>
> Thanks,
> Subhash
>
> On Wed, Oct 25, 2017 at 4:08 PM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi Sriram,
>>
>> Thanks. This is what I was looking for.
>>
>> one question, where do we need to specify the checkpoint directory in
>> case of structured streaming?
>>
>> Thanks,
>> Asmath
>>
>> On Wed, Oct 25, 2017 at 2:52 PM, Subhash Sriram <
>> subhash.sri...@gmail.com> wrote:
>>
>>> Hi Asmath,
>>>
>>> Here is an example of using structured streaming to read from Kafka:
>>>
>>> https://github.com/apache/spark/blob/master/examples/src/mai
>>> n/scala/org/apache/spark/examples/sql/streaming/StructuredKa
>>> fkaWordCount.scala
>>>
>>> In terms of parsing the JSON, there is a from_json function that you
>>> can use. The following might help:
>>>
>>> https://databricks.com/blog/2017/02/23/working-complex-data-
>>> formats-structured-streaming-apache-spark-2-1.html
>>>
>>> I hope this helps.
>>>
>>> Thanks,
>>> Subhash
>>>
>>> On Wed, Oct 25, 2017 at 2:59 PM, KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>>
 Hi,

 Could anyone provide suggestions on how to parse json data from
 kafka and load it back in hive.

 I have read about structured streaming but didn't find any
 examples. is there any best practise on how to read it and parse it 
 with
 structured streaming for this use case?

 Thanks,
 Asmath

>>>
>>>
>>
>

>>>
>>
>


Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-09 Thread Shixiong(Ryan) Zhu
It's because "toJSON" doesn't support Structured Streaming. The current
implementation will convert the Dataset to an RDD, which is not supported
by streaming queries.

On Sat, Sep 9, 2017 at 4:40 PM, kant kodali  wrote:

> yes it is a streaming dataset. so what is the problem with following code?
>
> Dataset ds = dataset.toJSON().map(()->{some function that returns a 
> string});
>  StreamingQuery query = ds.writeStream().start();
>  query.awaitTermination();
>
>
> On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung 
> wrote:
>
>> What is newDS?
>> If it is a Streaming Dataset/DataFrame (since you have writeStream there)
>> then there seems to be an issue preventing toJSON to work.
>>
>> --
>> *From:* kant kodali 
>> *Sent:* Saturday, September 9, 2017 4:04:33 PM
>> *To:* user @spark
>> *Subject:* Queries with streaming sources must be executed with
>> writeStream.start()
>>
>> Hi All,
>>
>> I  have the following code and I am not sure what's wrong with it? I
>> cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark
>> 2.2.0 so I am wondering if there is any work around?
>>
>>  Dataset ds = newDS.toJSON().map(()->{some function that returns a 
>> string});
>>  StreamingQuery query = ds.writeStream().start();
>>  query.awaitTermination();
>>
>>
>


Re: [Structured Streaming]Data processing and output trigger should be decoupled

2017-08-30 Thread Shixiong(Ryan) Zhu
I don't think that's a good idea. If the engine keeps on processing data
but doesn't output anything, where to keep the intermediate data?

On Wed, Aug 30, 2017 at 9:26 AM, KevinZwx  wrote:

> Hi,
>
> I'm working with structured streaming, and I'm wondering whether there
> should be some improvements about trigger.
>
> Currently, when I specify a trigger, i.e. tigger(Trigger.ProcessingTime(
> "10
> minutes")), the engine will begin processing data at the time the trigger
> begins, like 10:00:00, 10:10:00, 10:20:00,..., etc, if the engine takes 10s
> to process this batch of data, then we will get the output result at
> 10:00:10...,  then the engine just waits without processing any data. When
> the next trigger begins, the engine begins to process the data during the
> interval, and if this time the engine takes 15s to process the batch, we
> will get result at 10:10:15. This is the problem.
>
> In my understanding, the trigger and data processing should be decoupled,
> the engine should keep on processing data as fast as possible, but only
> generate output results at each trigger, therefore we can get the result at
> 10:00:00, 10:10:00, 10:20:00, ... So I'm wondering if there is any solution
> or plan to work on this?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: PySpark, Structured Streaming and Kafka

2017-08-23 Thread Shixiong(Ryan) Zhu
You can use `bin/pyspark --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0` to start "pyspark". If
you want to use "spark-submit", you also need to provide your Python file.

On Wed, Aug 23, 2017 at 1:41 PM, Brian Wylie 
wrote:

> Hi All,
>
> I'm trying the new hotness of using Kafka and Structured Streaming.
>
> Resources that I've looked at
> - https://spark.apache.org/docs/latest/streaming-programming-guide.html
> - https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
> - https://spark.apache.org/docs/latest/streaming-custom-receivers.html
> - http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.
> 0/Structured%20Streaming%20using%20Python%20DataFrames%20API.html
>
> My setup is a bit weird (yes.. yes.. I know...)
> - Eventually I'll just use a DataBricks cluster and life will be bliss :)
> - But for now I want to test/try stuff out on my little Mac Laptop
>
> The newest version of PySpark will install a local Spark server with a
> simple:
> $ pip install pyspark
>
> This is very nice. I've put together a little notebook using that kewl
> feature:
> - https://github.com/Kitware/BroThon/blob/master/notebooks/
> Bro_to_Spark_Cheesy.ipynb
>
> So the next step is the setup/use a Kafka message queue and that went
> well/works fine.
>
> $ kafka-console-consumer --bootstrap-server localhost:9092 --topic dns
>
> *I get messages spitting out*
>
> {"ts":1503513688.232274,"uid":"CdA64S2Z6Xh555","id.orig_h":"192.168.1.7","id.orig_p":58528,"id.resp_h":"192.168.1.1","id.resp_p":53,"proto":"udp","trans_id":43933,"rtt":0.02226,"query":"brian.wylie.is.awesome.tk","qclass":1,"qclass_name":"C_INTERNET","qtype":1,"qtype_name":"A","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["17.188.137.55","17.188.142.54","17.188.138.55","17.188.141.184","17.188.129.50","17.188.128.178","17.188.129.178","17.188.141.56"],"TTLs":[25.0,25.0,25.0,25.0,25.0,25.0,25.0,25.0],"rejected":false}
>
>
> Okay, finally getting to my question:
> - Local spark server (good)
> - Local kafka server and messages getting produced (good)
> - Trying to this line of PySpark code (not good)
>
> # Setup connection to Kafka Stream dns_events = 
> spark.readStream.format('kafka')\
>   .option('kafka.bootstrap.servers', 'localhost:9092')\
>   .option('subscribe', 'dns')\
>   .option('startingOffsets', 'latest')\
>   .load()
>
>
> fails with:
> java.lang.ClassNotFoundException: Failed to find data source: kafka.
> Please find packages at http://spark.apache.org/third-party-projects.html
>
> I've looked that the URL listed... and poking around I can see that maybe
> I need the kafka jar file as part of my local server.
>
> I lamely tried this:
> $ spark-submit --packages org.apache.spark:spark-sql-
> kafka-0-10_2.11:2.2.0
>
> Exception in thread "main" java.lang.IllegalArgumentException: Missing
> application resource. at org.apache.spark.launcher.CommandBuilderUtils.
> checkArgument(CommandBuilderUtils.java:241) at org.apache.spark.launcher.
> SparkSubmitCommandBuilder.buildSparkSubmitArgs(SparkSubmitCommandBuilder.java:160)
> at org.apache.spark.launcher.SparkSubmitCommandBuilder.
> buildSparkSubmitCommand(SparkSubmitCommandBuilder.java:274) at
> org.apache.spark.launcher.SparkSubmitCommandBuilder.buildCommand(
> SparkSubmitCommandBuilder.java:151) at org.apache.spark.launcher.
> Main.main(Main.java:86)
>
>
> Anyway, all my code/versions/etc are in this notebook:
> - https://github.com/Kitware/BroThon/blob/master/notebooks/
> Bro_to_Spark.ipynb
>
> I'd be tremendously appreciative of some super nice, smart person if they
> could point me in the right direction :)
>
> -Brian Wylie
>


Re: [StructuredStreaming] multiple queries of the socket source: only one query works.

2017-08-12 Thread Shixiong(Ryan) Zhu
Spark creates one connection for each query. The behavior you observed is
because how "nc -lk" works. If you use `netstat` to check the tcp
connections, you will see there are two connections when starting two
queries. However, "nc" forwards the input to only one connection.

On Fri, Aug 11, 2017 at 10:59 PM, Rick Moritz  wrote:

> Hi Gerard, hi List,
>
> I think what this would entail is for Source.commit to change its
> funcationality. You would need to track all streams' offsets there.
> Especially in the socket source, you already have a cache (haven't looked
> at Kafka's implementation to closely yet), so that shouldn't be the issue,
> if at start-time all streams subscribed to a source are known.
> What I worry about is, that this may need an API-change, to pass a
> stream-ID into commit. Since different streams can use different Triggers,
> you can have any number of unforeseeable results, when multiple threads
> call commit.
>
> I'll look into that, since I am in the progress of building a
> TwitterSource based on the socket source's general functionality, and due
> to the API restrictions there, it's even more important for multiple
> streams using one source.
>
> What I did observe was that every query did initialize a separate source.
> This won't work so well with socket, since the socket is in use, once you
> try to set up a second one. It also won't work so well with Twitter, since
> usually an API key is limited in how often it can be used somultaneously
> (likely at 2).
>
> An alternative to the socket source issue would be to open a new free
> socket, but then the user has to figure out where the source is listening.
>
> I second Gerard's request for additional information, and confirmation of
> my theories!
>
> Thanks,
>
> Rick
>
> On Fri, Aug 11, 2017 at 2:53 PM, Gerard Maas 
> wrote:
>
>> Hi,
>>
>> I've been investigating this SO question: https://stackoverflo
>> w.com/questions/45618489/executing-separate-streaming-
>> queries-in-spark-structured-streaming
>>
>> TL;DR: when using the Socket source, trying to create multiple queries
>> does not work properly, only one the first query in the start order will
>> receive data.
>>
>> This minimal example reproduces the issue:
>>
>> val lines = spark
>> .readStream
>> .format("socket")
>> .option("host", "localhost")
>> .option("port", "")
>> .option("includeTimestamp", true)
>> .load()
>>
>> val q1 = lines.writeStream
>>   .outputMode("append")
>>   .format("console")
>>   .start()
>>
>> val q2 = lines.withColumn("foo", lit("foo")).writeStream
>>   .outputMode("append")
>>   .format("console")
>>   .start()
>>
>> Sample output (spark shell):
>>
>> Batch: 0
>> ---
>> +-+---+
>> |value|  timestamp|
>> +-+---+
>> |  aaa|2017-08-11 23:37:59|
>> +-+---+
>>
>> ---
>> Batch: 1
>> ---
>> +-+---+
>> |value|  timestamp|
>> +-+---+
>> |  aaa|2017-08-11 23:38:00|
>> +-+---+
>>
>> q1.stop
>>
>> scala> ---
>> Batch: 0
>> ---
>> +-+---+---+
>> |value|  timestamp|foo|
>> +-+---+---+
>> |b|2017-08-11 23:38:19|foo|
>> +-+---+---+
>>
>> ---
>> Batch: 1
>> ---
>> +-+---+---+
>> |value|  timestamp|foo|
>> +-+---+---+
>> |b|2017-08-11 23:38:19|foo|
>> +-+---+---+
>>
>> This is certainly unexpected behavior. Even though the socket source is
>> marked "not for production" I wouldn't expect to be so limited.
>>
>> Am I right to think that the first running query consumes all the data in
>> the source, and therefore all the other queries do not work (until the
>> previous ones are stopped)?
>>
>> Is this a generalized behavior? e.g. each query started on a structured
>> streaming job fully consumes the source? e.g. the Kafka source can be used
>> with multiple queries because it can be replayed?
>>
>> As a workaround, would there be a way to cache the incoming data to
>> multiplex it? We cannot call `cache` a streaming dataset, but is there a
>> maybe way to do that?
>>
>> Could I have more details on the execution model (I've consumed all I
>> could find) and what are the (near) future plans?
>>
>> thanks!
>>
>> -Gerard.
>>
>>
>


Re: spark streaming socket read issue

2017-06-30 Thread Shixiong(Ryan) Zhu
Could you show the codes that start the StreamingQuery from Dataset?. If
you don't call `writeStream.start(...)`, it won't run anything.

On Fri, Jun 30, 2017 at 6:47 AM, pradeepbill  wrote:

> hi there, I have a spark streaming issue that i am not able to figure out ,
> below code reads from a socket, but I don't see any input going into the
> job, I have nc -l  running, and dumping data though, not sure why my
> spark job is not able to read data from  10.176.110.112:.Please
> advice.
>
> Dataset d = sparkSession.readStream().format("socket")
> .option("host",
> "10.176.110.112").option("port", ).load();
>
>
> thanks
> Pradeep
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-streaming-socket-read-issue-tp28813.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Building Kafka 0.10 Source for Structured Streaming Error.

2017-06-28 Thread Shixiong(Ryan) Zhu
"--package" will add transitive dependencies that are not
"$SPARK_HOME/external/kafka-0-10-sql/target/*.jar".

> i have tried building the jar with dependencies, but still face the same
error.

What's the command you used?

On Wed, Jun 28, 2017 at 12:00 PM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi All,
>
> I am trying too build Kafka-0-10-sql module under external folder in
> apache spark source code.
> Once i generate jar file using,
> build/mvn package -DskipTests -pl external/kafka-0-10-sql
> i get jar file created under external/kafka-0-10-sql/target.
>
> And try to run spark-shell with jars created in target folder as below,
> bin/spark-shell --jars $SPARK_HOME/external/kafka-0-10-sql/target/*.jar
>
> i get below error based on the command,
>
> Using Spark's default log4j profile: org/apache/spark/log4j-
> defaults.properties
>
> Setting default log level to "WARN".
>
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
>
> 17/06/28 11:54:03 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>
> Spark context Web UI available at http://10.1.10.241:4040
>
> Spark context available as 'sc' (master = local[*], app id =
> local-1498676043936).
>
> Spark session available as 'spark'.
>
> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/___/ .__/\_,_/_/ /_/\_\   version 2.3.0-SNAPSHOT
>
>   /_/
>
>
>
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_131)
>
> Type in expressions to have them evaluated.
>
> Type :help for more information.
>
> scala> val lines = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers",
> "localhost:9092").option("subscribe", "test").load()
>
> java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/
> ByteArrayDeserializer
>
>   at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(
> KafkaSourceProvider.scala:378)
>
>   at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(
> KafkaSourceProvider.scala)
>
>   at org.apache.spark.sql.kafka010.KafkaSourceProvider.
> validateStreamOptions(KafkaSourceProvider.scala:325)
>
>   at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(
> KafkaSourceProvider.scala:60)
>
>   at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(
> DataSource.scala:192)
>
>   at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$
> lzycompute(DataSource.scala:87)
>
>   at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(
> DataSource.scala:87)
>
>   at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(
> StreamingRelation.scala:30)
>
>   at org.apache.spark.sql.streaming.DataStreamReader.
> load(DataStreamReader.scala:150)
>
>   ... 48 elided
>
> Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.
> serialization.ByteArrayDeserializer
>
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>   ... 57 more
>
> ++
>
> i have tried building the jar with dependencies, but still face the same
> error.
>
> But when i try to do --package with spark-shell using bin/spark-shell
> --package org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 , it works
> fine.
>
> The reason, i am trying to build something from source code, is because i
> want to try pushing dataframe data into kafka topic, based on the url
> https://github.com/apache/spark/commit/b0a5cd89097c563e9949d8cfcf84d1
> 8b03b8d24c, which doesn't work with version 2.1.0.
>
>
> Any help would be highly appreciated.
>
>
> Regards,
>
> Satyajit.
>
>
>


Re: ZeroMQ Streaming in Spark2.x

2017-06-26 Thread Shixiong(Ryan) Zhu
It's moved to http://bahir.apache.org/

You can find document there.

On Mon, Jun 26, 2017 at 11:58 AM, Aashish Chaudhary <
aashish.chaudh...@kitware.com> wrote:

> Hi there,
>
> I am a beginner when it comes to Spark streaming. I was looking for some
> examples related to ZeroMQ and Spark and realized that ZeroMQUtils is no
> longer present in Spark 2.x.
>
> I would appreciate if someone can shed some light on the history and what
> I could do to use ZeroMQ with Spark Streaming in the current version.
>
> Thanks,
>
>


Re: Error while doing mvn release for spark 2.0.2 using scala 2.10

2017-06-19 Thread Shixiong(Ryan) Zhu
Some of projects (such as spark-tags) are Java projects. Spark doesn't fix
the artifact name and just hard-core 2.11.

For your issue, try to use `install` rather than `package`.

On Sat, Jun 17, 2017 at 7:20 PM, Kanagha Kumar 
wrote:

> Hi,
>
> Bumping up again! Why does spark modules depend upon scala2.11 versions
> inspite of changing pom.xmls using ./dev/change-scala-version.sh 2.10.
> Appreciate any quick help!!
>
> Thanks
>
> On Fri, Jun 16, 2017 at 2:59 PM, Kanagha Kumar 
> wrote:
>
>> Hey all,
>>
>>
>> I'm trying to use Spark 2.0.2 with scala 2.10 by following this
>> https://spark.apache.org/docs/2.0.2/building-spark.html
>> #building-for-scala-210
>>
>> ./dev/change-scala-version.sh 2.10
>> ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
>>
>>
>> I could build the distribution successfully using
>> bash -xv dev/make-distribution.sh --tgz  -Dscala-2.10 -DskipTests
>>
>> But, when I am trying to maven release, it keeps failing with the error
>> using the command:
>>
>>
>> Executing Maven:  -B -f pom.xml  -DscmCommentPrefix=[maven-release-plugin]
>> -e  -Dscala-2.10 -Pyarn -Phadoop-2.7 -Phadoop-provided -DskipTests
>> -Dresume=false -U -X *release:prepare release:perform*
>>
>> Failed to execute goal on project spark-sketch_2.10: Could not resolve
>> dependencies for project 
>> org.apache.spark:spark-sketch_2.10:jar:2.0.2-sfdc-3.0.0:
>> *Failure to find org.apache.spark:spark-tags_2.11:jar:2.0.2-sfdc-3.0.0*
>> in  was cached in the local repository, resolution will
>> not be reattempted until the update interval of nexus has elapsed or
>> updates are forced - [Help 1]
>>
>>
>> Why does spark-sketch depend upon spark-tags_2.11 when I have already
>> compiled against scala 2.10?? Any pointers would be helpful.
>> Thanks
>> Kanagha
>>
>
>


Re: Is Structured streaming ready for production usage

2017-06-08 Thread Shixiong(Ryan) Zhu
Please take a look at
http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

On Thu, Jun 8, 2017 at 4:46 PM, swetha kasireddy 
wrote:

> OK. Can we use Spark Kafka Direct with  Structured Streaming?
>
> On Thu, Jun 8, 2017 at 4:46 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> OK. Can we use Spark Kafka Direct as part of Structured Streaming?
>>
>> On Thu, Jun 8, 2017 at 3:35 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> YES. At Databricks, our customers have already been using Structured
>>> Streaming and in the last month alone processed over 3 trillion records.
>>>
>>> https://databricks.com/blog/2017/06/06/simple-super-fast-str
>>> eaming-engine-apache-spark.html
>>>
>>> On Thu, Jun 8, 2017 at 3:03 PM, SRK  wrote:
>>>
 Hi,

 Is structured streaming ready for production usage in Spark 2.2?

 Thanks,
 Swetha



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Is-Structured-streaming-ready-for-prod
 uction-usage-tp28751.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>
>


Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
I don't know what happened in your case so cannot provide any work around.
It would be great if you can provide logs output by
HDFSBackedStateStoreProvider.

On Thu, May 25, 2017 at 4:05 PM, kant kodali <kanth...@gmail.com> wrote:

>
> On Thu, May 25, 2017 at 3:41 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*
>>
>
> Hi,
>
> There are no files under bin/hadoop fs -ls 
> /usr/local/hadoop/checkpoint/state/0/*
> but all the directories until /usr/local/hadoop/checkpoint/state/0 does
> exist(which are created by spark).
>
> yes I can attach the log but pretty much it looks like same as I sent on
> this thread.
>
> Is there any work around to this for now? Will create a ticket shortly.
>
> Thanks!
>


Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
Feel free to create a new ticket. Could you also provide the files in
"/usr/local/hadoop/checkpoint/state/0" (Just run "bin/hadoop fs -ls
/usr/local/hadoop/checkpoint/state/0/*") in the ticket and the Spark logs?

On Thu, May 25, 2017 at 2:53 PM, kant kodali <kanth...@gmail.com> wrote:

> Should I file a ticket or should I try another version like Spark 2.2
> since I am currently using 2.1.1?
>
> On Thu, May 25, 2017 at 2:38 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi Ryan,
>>
>> You are right I was setting checkpointLocation for readStream. Now I did
>> set if for writeStream as well  like below
>>
>> StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option(
>> "checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update"
>> ).start();
>>
>> query.awaitTermination();
>>
>> *and now I can at very least see there are directories like*
>>
>> -rw-r--r--   2 ubuntu supergroup 45 2017-05-25 21:29 
>> /usr/local/hadoop/checkpoint/metadata
>> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30 
>> /usr/local/hadoop/checkpoint/offsets
>> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:29 
>> /usr/local/hadoop/checkpoint/sources
>> drwxr-xr-x   - ubuntu supergroup  0 2017-05-25 21:30 
>> /usr/local/hadoop/checkpoint/state
>>
>>
>> However it still fails with
>>
>> *org.apache.hadoop.ipc.RemoteException(java.io 
>> <http://java.io>.FileNotFoundException): File does not exist: 
>> /usr/local/hadoop/checkpoint/state/0/1/1.delta*
>>
>>
>>
>> On Thu, May 25, 2017 at 2:31 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Read your codes again and found one issue: you set "checkpointLocation"
>>> in `readStream`. It should be set in `writeStream`. However, I still have
>>> no idea why use a temp checkpoint location will fail.
>>>
>>> On Thu, May 25, 2017 at 2:23 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> I did the following
>>>>
>>>> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did 
>>>> *bin/hadoop
>>>> fs -ls / *
>>>>
>>>> and I can actually see */tmp* and */usr* and inside of */usr *there is
>>>> indeed *local/hadoop/checkpoint. *
>>>>
>>>> So until here it looks fine.
>>>>
>>>> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
>>>> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
>>>> anything.
>>>>
>>>> Now I ran my spark driver program using spark-submit it failed with the
>>>> following exception
>>>>
>>>> *File does not exist:
>>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>>>>
>>>> so I did *bin/hadoop fs -ls *
>>>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>>>>
>>>> and I did not see anything there like *1.delta(there are just no
>>>> files)* however all these directories 
>>>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>>>>  *do exist.
>>>>
>>>> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint
>>>> *and  *hdfs:///usr/local/hadoop/checkpoint  *so far and
>>>> both didn't work for me. It is failing with the same error "*File does
>>>> not exist:
>>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>>>>
>>>> so what can be the problem? any ideas?
>>>>
>>>> Thanks!
>>>>
>>>> On Thu, May 25, 2017 at 1:31 AM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>>>>>
>>>>> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>>>>>
>>>>> This is what I expected as well since I don't see any checkpoint
>>>>> directory under /usr/local/hadoop. Am I missing any configuration variable
>>>>> like HADOOP_CONF_DIR ? I am currently not setting that in
>>>>> conf/spark-env.sh and thats the only hadoop related environment variable I
>>>>> see. please let me know
>>>>>
>>>>> thanks!
>>>>>
>>>>>
>>&g

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
Read your codes again and found one issue: you set "checkpointLocation" in
`readStream`. It should be set in `writeStream`. However, I still have no
idea why use a temp checkpoint location will fail.

On Thu, May 25, 2017 at 2:23 PM, kant kodali <kanth...@gmail.com> wrote:

> I did the following
>
> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
> fs -ls / *
>
> and I can actually see */tmp* and */usr* and inside of */usr *there is
> indeed *local/hadoop/checkpoint. *
>
> So until here it looks fine.
>
> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
> anything.
>
> Now I ran my spark driver program using spark-submit it failed with the
> following exception
>
> *File does not exist:
> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>
> so I did *bin/hadoop fs -ls *
> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>
> and I did not see anything there like *1.delta(there are just no files)* 
> however
> all these directories 
> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>  *do exist.
>
> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *
> and  *hdfs:///usr/local/hadoop/checkpoint  *so far and
> both didn't work for me. It is failing with the same error "*File does
> not exist:
> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>
> so what can be the problem? any ideas?
>
> Thanks!
>
> On Thu, May 25, 2017 at 1:31 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>>
>> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>>
>> This is what I expected as well since I don't see any checkpoint
>> directory under /usr/local/hadoop. Am I missing any configuration variable
>> like HADOOP_CONF_DIR ? I am currently not setting that in
>> conf/spark-env.sh and thats the only hadoop related environment variable I
>> see. please let me know
>>
>> thanks!
>>
>>
>>
>> On Thu, May 25, 2017 at 1:19 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi Ryan,
>>>
>>> I did add that print statement and here is what I got.
>>>
>>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>>
>>> Thanks!
>>>
>>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
>>>> I meant using HDFS command to check the directory. Such as "bin/hadoop
>>>> fs -ls /usr/local/hadoop/checkpoint". My hunch is the default file system
>>>> in driver probably is the local file system. Could you add the following
>>>> line into your code to print the default file system?
>>>>
>>>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>>>> ration).getClass)
>>>>
>>>> On Wed, May 24, 2017 at 5:59 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
>>>>> can see below however I dont see checkpoint directory under my 
>>>>> hadoop_home=
>>>>> /usr/local/hadoop in either datanodes or namenodes however in
>>>>> datanode machine there seems to be some data under
>>>>>
>>>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>>>>> X-1495672725898/current/finalized/subdir0/subdir0
>>>>>
>>>>> I thought the checkpoint directory will be created by spark once I
>>>>> specify the path but do I manually need to create checkpoint dir using
>>>>> mkdir in all spark worker machines? I am new to HDFS as well so please let
>>>>> me know. I can try sending df.explain("true") but I have 100 fields in my
>>>>> schema so Project looks really big and if this is not a problem for you
>>>>> guys I can send that as well.
>>>>>
>>>>>
>>>>>   +- StreamingRelation 
>>>>> DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
>>>>>  -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> 
>>>>> X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation -> 
>>>>> /usr/local/hadoop/che

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
toBufRpcInvoker.call(ProtobufRpcEngine.java:616)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
> at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
> at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
> at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>
>
> On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> What's the value of "hdfsCheckPointDir"? Could you list this directory on
>> HDFS and report the files there?
>>
>> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> -dev
>>>
>>> Have you tried clearing out the checkpoint directory?  Can you also give
>>> the full stack trace?
>>>
>>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Even if I do simple count aggregation like below I get the same error
>>>> as https://issues.apache.org/jira/browse/SPARK-19268
>>>>
>>>> Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 
>>>> hours", "24 hours"), df1.col("AppName")).count();
>>>>
>>>>
>>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>>>>> Kafka
>>>>>
>>>>> I am running into the same problem as https://issues.apache.org/jira
>>>>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>>>>
>>>>> Here is my sample code
>>>>>
>>>>> *Here is how I create ReadStream*
>>>>>
>>>>> sparkSession.readStream()
>>>>> .format("kafka")
>>>>> .option("kafka.bootstrap.servers", 
>>>>> config.getString("kafka.consumer.settings.bootstrapServers"))
>>>>> .option("subscribe", 
>>>>> config.getString("kafka.consumer.settings.topicName"))
>>>>> .option("startingOffsets", "earliest")
>>>>> .option("failOnDataLoss", "false")
>>>>> .option("checkpointLocation", hdfsCheckPointDir)
>>>>> .load();
>>>>>
>>>>>
>>>>> *The core logic*
>>>>>
>>>>> Dataset df = ds.select(from_json(new Column("value").cast("string"), 
>>>>> client.getSchema()).as("payload"));
>>>>> Dataset df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>>>> Dataset df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", 
>>>>> "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>>>> StreamingQuery query = df1.writeStream().foreach(new 
>>>>> KafkaSink()).outputMode("update").start();
>>>>> query.awaitTermination();
>>>>>
>>>>>
>>>>> I can also provide any other information you may need.
>>>>>
>>>>> Thanks!
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Running into the same problem as JIRA SPARK-19268

2017-05-24 Thread Shixiong(Ryan) Zhu
What's the value of "hdfsCheckPointDir"? Could you list this directory on
HDFS and report the files there?

On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust 
wrote:

> -dev
>
> Have you tried clearing out the checkpoint directory?  Can you also give
> the full stack trace?
>
> On Wed, May 24, 2017 at 3:45 PM, kant kodali  wrote:
>
>> Even if I do simple count aggregation like below I get the same error as
>> https://issues.apache.org/jira/browse/SPARK-19268
>>
>> Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 
>> hours", "24 hours"), df1.col("AppName")).count();
>>
>>
>> On Wed, May 24, 2017 at 3:35 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>>> Kafka
>>>
>>> I am running into the same problem as https://issues.apache.org/jira
>>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>>
>>> Here is my sample code
>>>
>>> *Here is how I create ReadStream*
>>>
>>> sparkSession.readStream()
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", 
>>> config.getString("kafka.consumer.settings.bootstrapServers"))
>>> .option("subscribe", 
>>> config.getString("kafka.consumer.settings.topicName"))
>>> .option("startingOffsets", "earliest")
>>> .option("failOnDataLoss", "false")
>>> .option("checkpointLocation", hdfsCheckPointDir)
>>> .load();
>>>
>>>
>>> *The core logic*
>>>
>>> Dataset df = ds.select(from_json(new Column("value").cast("string"), 
>>> client.getSchema()).as("payload"));
>>> Dataset df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>> Dataset df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", 
>>> "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>> StreamingQuery query = df1.writeStream().foreach(new 
>>> KafkaSink()).outputMode("update").start();
>>> query.awaitTermination();
>>>
>>>
>>> I can also provide any other information you may need.
>>>
>>> Thanks!
>>>
>>
>>
>


Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread Shixiong(Ryan) Zhu
The default "startingOffsets" is "latest". If you don't push any data after
starting the query, it won't fetch anything. You can set it to "earliest"
like ".option("startingOffsets", "earliest")" to start the stream from the
beginning.

On Tue, May 16, 2017 at 12:36 AM, kant kodali  wrote:

> Hi All,
>
> I have the following code.
>
>  val ds = sparkSession.readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers",bootstrapServers))
> .option("subscribe", topicName)
> .option("checkpointLocation", hdfsCheckPointDir)
> .load();
>
>  val ds1 = ds.select($"value")
>  val query = ds1.writeStream.outputMode("append").format("console").start()
>  query.awaitTermination()
>
> There are no errors when I execute this code however I don't see any data
> being printed out to console? When I run my standalone test Kafka consumer
> jar I can see that it is receiving messages. so I am not sure what is going
> on with above code? any ideas?
>
> Thanks!
>


Re: Application dies, Driver keeps on running

2017-05-15 Thread Shixiong(Ryan) Zhu
So you are using `client` mode. Right? If so, Spark cluster doesn't manage
the driver for you. Did you see any error logs in driver?

On Mon, May 15, 2017 at 3:01 PM, map reduced  wrote:

> Hi,
>
> Setup: Standalone cluster with 32 workers, 1 master
> I am running a long running streaming spark job (read from Kafka ->
> process -> send to Http endpoint) which should ideally never stop.
>
> I have 2 questions:
> 1) I have seen some times Driver is still running but application marked
> as *Finished*. *Any idea why this happens or any way to debug this?*
> Sometimes after running for say 2-3 days (or 4-5 days - random timeframe)
> this issue arises, not sure what is causing it. Nothing in logs suggests
> failures or exceptions
>
> 2) Is there a way for Driver to kill itself instead of keeping on running
> without any application to drive?
>
> Thanks,
> KP
>


Re: Why does dataset.union fails but dataset.rdd.union execute correctly?

2017-05-08 Thread Shixiong(Ryan) Zhu
This is because RDD.union doesn't check the schema, so you won't see the
problem unless you run RDD and hit the incompatible column problem. For
RDD, You may not see any error if you don't use the incompatible column.

Dataset.union requires compatible schema. You can print ds.schema and
ds1.schema and check if they are same.

On Mon, May 8, 2017 at 11:07 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hello,
> I've a very complex case class structure, with a lot of fields.
> When I try to union two datasets of this class, it doesn't work with the
> following error :
> ds.union(ds1)
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Union
> can only be performed on tables with the compatible column types
>
> But when use it's rdd, the union goes right:
> ds.rdd.union(ds1.rdd)
> res8: org.apache.spark.rdd.RDD[
>
> Is there any reason for this to happen (besides a bug ;) )
>
>
>


Re: Upgrade the scala code using the most updated Spark version

2017-03-28 Thread Shixiong(Ryan) Zhu
mapPartitionsWithSplit was removed in Spark 2.0.0. You can
use mapPartitionsWithIndex instead.

On Tue, Mar 28, 2017 at 3:52 PM, Anahita Talebi 
wrote:

> Thanks.
> I tried this one, as well. Unfortunately I still get the same error.
>
>
> On Wednesday, March 29, 2017, Marco Mistroni  wrote:
>
>> 1.7.5
>>
>> On 28 Mar 2017 10:10 pm, "Anahita Talebi" 
>> wrote:
>>
>>> Hi,
>>>
>>> Thanks for your answer.
>>> What is the version of "org.slf4j" % "slf4j-api" in your sbt file?
>>> I think the problem might come from this part.
>>>
>>> On Tue, Mar 28, 2017 at 11:02 PM, Marco Mistroni 
>>> wrote:
>>>
 Hello
  uhm ihave a project whose build,sbt is closest to yours, where i am
 using spark 2.1, scala 2.11 and scalatest (i upgraded to 3.0.0) and it
 works fine
 in my projects though i don thave any of the following libraries that
 you mention
 - breeze
 - netlib,all
 -  scoopt

 hth

 On Tue, Mar 28, 2017 at 9:10 PM, Anahita Talebi <
 anahita.t.am...@gmail.com> wrote:

> Hi,
>
> Thanks for your answer.
>
> I first changed the scala version to 2.11.8 and kept the spark version
> 1.5.2 (old version). Then I changed the scalatest version into "3.0.1".
> With this configuration, I could run the code and compile it and generate
> the .jar file.
>
> When I changed the spark version into 2.1.0, I get the same error as
> before. So I imagine the problem should be somehow related to the version
> of spark.
>
> Cheers,
> Anahita
>
> 
> 
> 
> import AssemblyKeys._
>
> assemblySettings
>
> name := "proxcocoa"
>
> version := "0.1"
>
> organization := "edu.berkeley.cs.amplab"
>
> scalaVersion := "2.11.8"
>
> parallelExecution in Test := false
>
> {
>   val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")
>   libraryDependencies ++= Seq(
> "org.slf4j" % "slf4j-api" % "1.7.2",
> "org.slf4j" % "slf4j-log4j12" % "1.7.2",
> "org.scalatest" %% "scalatest" % "3.0.1" % "test",
> "org.apache.spark" %% "spark-core" % "2.1.0"
> excludeAll(excludeHadoop),
> "org.apache.spark" %% "spark-mllib" % "2.1.0"
> excludeAll(excludeHadoop),
> "org.apache.spark" %% "spark-sql" % "2.1.0"
> excludeAll(excludeHadoop),
> "org.apache.commons" % "commons-compress" % "1.7",
> "commons-io" % "commons-io" % "2.4",
> "org.scalanlp" % "breeze_2.11" % "0.11.2",
> "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(),
> "com.github.scopt" %% "scopt" % "3.3.0"
>   )
> }
>
> {
>   val defaultHadoopVersion = "1.0.4"
>   val hadoopVersion =
> scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION",
> defaultHadoopVersion)
>   libraryDependencies += "org.apache.hadoop" % "hadoop-client" %
> hadoopVersion
> }
>
> libraryDependencies += "org.apache.spark" %% "spark-streaming" %
> "2.1.0"
>
> resolvers ++= Seq(
>   "Local Maven Repository" at Path.userHome.asFile.toURI.toURL +
> ".m2/repository",
>   "Typesafe" at "http://repo.typesafe.com/typesafe/releases;,
>   "Spray" at "http://repo.spray.cc;
> )
>
> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>   {
> case PathList("javax", "servlet", xs @ _*)   =>
> MergeStrategy.first
> case PathList(ps @ _*) if ps.last endsWith ".html"   =>
> MergeStrategy.first
> case "application.conf"  =>
> MergeStrategy.concat
> case "reference.conf"=>
> MergeStrategy.concat
> case "log4j.properties"  =>
> MergeStrategy.discard
> case m if m.toLowerCase.endsWith("manifest.mf")  =>
> MergeStrategy.discard
> case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
> MergeStrategy.discard
> case _ => MergeStrategy.first
>   }
> }
>
> test in assembly := {}
> 
> 
> 
>
> On Tue, Mar 28, 2017 at 9:33 PM, Marco Mistroni 
> wrote:
>
>> Hello
>>  that looks to me like there's something dodgy withyour Scala
>> installation
>> Though Spark 2.0 is built on Scala 2.11, it still support 2.10... i
>> suggest you change one thing at a time in your sbt
>> First Spark version. run it and see if it works
>> Then amend the scala version
>>
>> hth

Re: Failed to connect to master ...

2017-03-07 Thread Shixiong(Ryan) Zhu
The Spark master may bind to a different address. Take a look at this page
to find the correct URL: http://VM_IPAddress:8080/

On Tue, Mar 7, 2017 at 10:13 PM, Mina Aslani  wrote:

> Master and worker processes are running!
>
> On Wed, Mar 8, 2017 at 12:38 AM, ayan guha  wrote:
>
>> You need to start Master and worker processes before connecting to them.
>>
>> On Wed, Mar 8, 2017 at 3:33 PM, Mina Aslani  wrote:
>>
>>> Hi,
>>>
>>> I am writing a spark Transformer in intelliJ in Java and trying to
>>> connect to the spark in a VM using setMaster. I get "Failed to connect to
>>> master ..."
>>>
>>> I get 17/03/07 16:20:55 WARN StandaloneAppClient$ClientEndpoint: Failed
>>> to connect to master VM_IPAddress:7077
>>> org.apache.spark.SparkException: Exception thrown in awaitResult
>>> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTi
>>> meout.scala:77)
>>> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTi
>>> meout.scala:75)
>>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>>> unction.scala:36)
>>> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout
>>> $1.applyOrElse(RpcTimeout.scala:59)
>>>
>>> SparkSession spark = SparkSession
>>>   .builder()
>>>   .appName("Java Spark SQL")
>>>   //.master("local[1]")
>>>   .master("spark://VM_IPAddress:7077")
>>>   .getOrCreate();
>>>
>>> Dataset lines = spark
>>>   .readStream()
>>>   .format("kafka")  .option("kafka.bootstrap.servers", brokers) 
>>>  .option("subscribe", topic)  .load()
>>>   .selectExpr("CAST(value AS STRING)")  .as(Encoders.STRING());
>>>
>>>
>>>
>>> I get same error when I try master("*spark://spark-master:7077**"*).
>>>
>>> *However, .master("local[1]") *no exception is thrown*.*
>>> *
>>> My Kafka is in the same VM and being new to SPARK still trying to 
>>> understand:
>>> *
>>>
>>> - Why I get above exception and how I can fix it (connect to SPARK in VM 
>>> and read form KAfKA in VM)?
>>>
>>> - Why using "local[1]" no exception is thrown and how to setup to read from 
>>> kafka in VM?
>>>
>>> *- How to stream from Kafka (data in the topic is in json format)?
>>> *
>>> Your input is appreciated!
>>>
>>> Best regards,
>>> Mina
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Structured Streaming - Kafka

2017-03-07 Thread Shixiong(Ryan) Zhu
Good catch. Could you create a ticket? You can also submit a PR to fix it
if you have time :)

On Tue, Mar 7, 2017 at 1:52 PM, Bowden, Chris  wrote:

> Potential bug when using startingOffsets = SpecificOffsets with Kafka
> topics containing uppercase characters?
>
> KafkaSourceProvider#L80/86:
>
> val startingOffsets =
>   
> caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase)
>  match {
> case Some("latest") => LatestOffsets
> case Some("earliest") => EarliestOffsets
> case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
> case None => LatestOffsets
>   }
>
> Topics in JSON get lowered so underlying assignments in the consumer are
> incorrect, and the assertion in KafkaSource#L326 triggers:
>
> private def fetchSpecificStartingOffsets(
> partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = 
> {
>   val result = withRetriesWithoutInterrupt {
> // Poll to get the latest assigned partitions
> consumer.poll(0)
> val partitions = consumer.assignment()
> consumer.pause(partitions)
> assert(partitions.asScala == partitionOffsets.keySet,
>   "If startingOffsets contains specific offsets, you must specify all 
> TopicPartitions.\n" +
> "Use -1 for latest, -2 for earliest, if you don't care.\n" +
> s"Specified: ${partitionOffsets.keySet} Assigned: 
> ${partitions.asScala}")
>
>


Re: Why spark history server does not show RDD even if it is persisted?

2017-02-28 Thread Shixiong(Ryan) Zhu
The REST APIs are not just for Spark history server. When an application is
running, you can use the REST APIs to talk to Spark UI HTTP server as well.

On Tue, Feb 28, 2017 at 10:46 AM, Parag Chaudhari 
wrote:

> ping...
>
>
>
> *Thanks,Parag Chaudhari,**USC Alumnus (Fight On!)*
> *Mobile : (213)-572-7858 <(213)%20572-7858>*
> *Profile: http://www.linkedin.com/pub/parag-chaudhari/28/a55/254
> *
>
>
> On Wed, Feb 22, 2017 at 7:54 PM, Parag Chaudhari 
> wrote:
>
>> Thanks!
>>
>> If spark does not log these events in event log then why spark history
>> server provides an API to get RDD information?
>>
>> From the documentation,
>>
>> /applications/[app-id]/storage/rdd   A list of stored RDDs for the given
>> application.
>>
>> /applications/[app-id]/storage/rdd/[rdd-id]   Details for the storage
>> status of a given RDD.
>>
>>
>>
>>
>> *Thanks,Parag Chaudhari,**USC Alumnus (Fight On!)*
>> *Mobile : (213)-572-7858 <(213)%20572-7858>*
>> *Profile: http://www.linkedin.com/pub/parag-chaudhari/28/a55/254
>> *
>>
>>
>> On Wed, Feb 22, 2017 at 7:44 PM, Saisai Shao 
>> wrote:
>>
>>> It is too verbose, and will significantly increase the size event log.
>>>
>>> Here is the comment in the code:
>>>
>>> // No-op because logging every update would be overkill
 override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}


>>> On Thu, Feb 23, 2017 at 11:42 AM, Parag Chaudhari 
>>> wrote:
>>>
 Thanks a lot the information!

 Is there any reason why EventLoggingListener ignore this event?

 *Thanks,*


 *​Parag​*

 On Wed, Feb 22, 2017 at 7:11 PM, Saisai Shao 
 wrote:

> AFAIK, Spark's EventLoggingListerner ignores BlockUpdate event, so it
> will not be written into event-log, I think that's why you cannot get such
> info in history server.
>
> On Thu, Feb 23, 2017 at 9:51 AM, Parag Chaudhari 
> wrote:
>
>> Hi,
>>
>> I am running spark shell in spark version 2.0.2. Here is my program,
>>
>> var myrdd = sc.parallelize(Array.range(1, 10))
>> myrdd.setName("test")
>> myrdd.cache
>> myrdd.collect
>>
>> But I am not able to see any RDD info in "storage" tab in spark
>> history server.
>>
>> I looked at this
>> 
>> but it is not helping as I have exact similar program mentioned there. 
>> Can
>> anyone help?
>>
>>
>> *Thanks,*
>>
>> *​Parag​*
>>
>
>

>>>
>>
>


Re: Fault tolerant broadcast in updateStateByKey

2017-02-07 Thread Shixiong(Ryan) Zhu
It's documented here:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints

On Tue, Feb 7, 2017 at 8:12 AM, Amit Sela  wrote:

> Hi all,
>
> I was wondering if anyone ever used a broadcast variable within
> an updateStateByKey op. ? Using it is straight-forward but I was wondering
> how it'll work after resuming from checkpoint (using the rdd.context()
> trick is not possible here) ?
>
> Thanks,
> Amit
>


Re: Spark streaming question - SPARK-13758 Need to use an external RDD inside DStream processing...Please help

2017-02-07 Thread Shixiong(Ryan) Zhu
You can create lazily instantiated singleton instances. See
http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints
for examples of accumulators and broadcast variables. You can use the same
approach to create your cached RDD.

On Tue, Feb 7, 2017 at 10:45 AM, shyla deshpande 
wrote:

> and my cached RDD is not small. If it was maybe I could materialize and
> broadcast.
>
> Thanks
>
> On Tue, Feb 7, 2017 at 10:28 AM, shyla deshpande  > wrote:
>
>> I have a situation similar to the following and I get SPARK-13758 
>> .
>>
>>
>> I understand why I get this error, but I want to know what should be the 
>> approach in dealing with these situations.
>>
>>
>> Thanks
>>
>>
>> > var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
>> > val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
>> > words.foreachRDD((rdd: RDD[String]) => {
>> >   val res = rdd.map(word => (word, word.length)).collect()
>> >   println("words: " + res.mkString(", "))
>> >   cached = cached.union(rdd)
>> >   cached.checkpoint()
>> >   println("cached words: " + cached.collect.mkString(", "))
>> > })
>>
>>
>


Re: Resource Leak in Spark Streaming

2017-01-31 Thread Shixiong(Ryan) Zhu
t; producer = new 
> KafkaProducer<String,String>(properties);
>   return producer;
>    }
>
>    private void initialize() {
>   pool = new ConcurrentLinkedQueue<KafkaProducer<String, String>>();
>
>   for (int i = 0; i < minIdle; i++) {
>  pool.add(createProducer());
>   }
>}
>
>public void closeAll() {
>   KafkaProducer<String, String> object;
>   while ((object = pool.poll()) != null) {
>  //object.flush();
>  object.close();
>   }
>}
> }
>
> Thanks
> Nipun
>
> On Tue, Jan 31, 2017 at 6:09 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Looks like you create KafkaProducerPool in the driver. So when the task
>> is running in the executor, it will always see an new
>> empty KafkaProducerPool and create KafkaProducers. But nobody closes these
>> KafkaProducers.
>>
>> On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora <nipunarora2...@gmail.com>
>> wrote:
>>
>>
>> Sorry for not writing the patch number, it's spark 1.6.1.
>> The relevant code is here inline.
>>
>> Please have a look and let me know if there is a resource leak.
>> Please also let me know if you need any more details.
>>
>> Thanks
>> Nipun
>>
>>
>> The JavaRDDKafkaWriter code is here inline:
>>
>> import org.apache.spark.api.java.JavaRDD;
>> import org.apache.spark.api.java.function.VoidFunction;
>> import scala.Tuple2;
>>
>> import java.io.Serializable;
>> import java.util.Iterator;
>>
>> public class JavaRDDStringKafkaWriter implements Serializable, 
>> VoidFunction<JavaRDD> {
>>
>>private static final long serialVersionUID = -865193912367180261L;
>>private final KafkaProducerPool pool;
>>private final String topic;
>>private final Boolean kafkaAsync;
>>
>>public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String 
>> topic, Boolean kafkaAsync) {
>>   this.pool = pool;
>>   this.topic = topic;
>>   this.kafkaAsync = kafkaAsync;
>>}
>>
>>@Override
>>public void call(JavaRDD stringJavaRDD) throws Exception {
>>   stringJavaRDD.foreachPartition(new PartitionVoidFunction(
>> new RDDKafkaWriter(pool,kafkaAsync), topic));
>>}
>>
>>private class PartitionVoidFunction implements
>>  VoidFunction<Iterator> {
>>
>>   private static final long serialVersionUID = 8726871215617446598L;
>>   private final RDDKafkaWriter kafkaWriter;
>>   private final String topic;
>>
>>   public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) 
>> {
>>  this.kafkaWriter = kafkaWriter;
>>  this.topic = topic;
>>   }
>>
>>   @Override
>>   public void call(Iterator iterator) throws Exception {
>>  while (iterator.hasNext()) {
>> kafkaWriter.writeToKafka(topic, iterator.next());
>>  }
>>   }
>>}
>> }
>>
>>
>> The RDDKafkaWriter is here:
>>
>>
>> import java.io.Serializable;
>> import java.util.concurrent.ExecutionException;
>>
>> import org.apache.kafka.clients.producer.KafkaProducer;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>>
>> import scala.Tuple2;
>>
>> public class RDDKafkaWriter implements Serializable {
>>
>>private static final long serialVersionUID = 7374381310562055607L;
>>private final KafkaProducerPool pool;
>>private final Boolean async;
>>
>>public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) {
>>   this.pool = pool;
>>   this.async = async;
>>
>>}
>>
>>public void writeToKafka(String topic, Tuple2<String, String> message) {
>>   KafkaProducer<String, String> producer = pool.borrowProducer();
>>   ProducerRecord<String, String> record = new ProducerRecord<String, 
>> String>(
>> topic, message._1(), message._2());
>>   if (async) {
>>  producer.send(record);
>>   } else {
>>      try {
>> producer.send(record).get();
>>  } catch (Exception e) {
>> e.printStackTrace();
>>  }
>>   }
>>   pool.returnProducer(producer);
>>}
>>
>> public void writeToKafka(String topic, String message) {
>>
>> KafkaProducer<String,

Re: Resource Leak in Spark Streaming

2017-01-31 Thread Shixiong(Ryan) Zhu
Looks like you create KafkaProducerPool in the driver. So when the task is
running in the executor, it will always see an new empty KafkaProducerPool
and create KafkaProducers. But nobody closes these KafkaProducers.

On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora <nipunarora2...@gmail.com>
wrote:

>
> Sorry for not writing the patch number, it's spark 1.6.1.
> The relevant code is here inline.
>
> Please have a look and let me know if there is a resource leak.
> Please also let me know if you need any more details.
>
> Thanks
> Nipun
>
>
> The JavaRDDKafkaWriter code is here inline:
>
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.VoidFunction;
> import scala.Tuple2;
>
> import java.io.Serializable;
> import java.util.Iterator;
>
> public class JavaRDDStringKafkaWriter implements Serializable, 
> VoidFunction<JavaRDD> {
>
>private static final long serialVersionUID = -865193912367180261L;
>private final KafkaProducerPool pool;
>private final String topic;
>private final Boolean kafkaAsync;
>
>public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String 
> topic, Boolean kafkaAsync) {
>   this.pool = pool;
>   this.topic = topic;
>   this.kafkaAsync = kafkaAsync;
>}
>
>@Override
>public void call(JavaRDD stringJavaRDD) throws Exception {
>   stringJavaRDD.foreachPartition(new PartitionVoidFunction(
> new RDDKafkaWriter(pool,kafkaAsync), topic));
>}
>
>private class PartitionVoidFunction implements
>  VoidFunction<Iterator> {
>
>   private static final long serialVersionUID = 8726871215617446598L;
>   private final RDDKafkaWriter kafkaWriter;
>   private final String topic;
>
>   public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) {
>  this.kafkaWriter = kafkaWriter;
>  this.topic = topic;
>   }
>
>   @Override
>   public void call(Iterator iterator) throws Exception {
>  while (iterator.hasNext()) {
> kafkaWriter.writeToKafka(topic, iterator.next());
>  }
>   }
>}
> }
>
>
> The RDDKafkaWriter is here:
>
>
> import java.io.Serializable;
> import java.util.concurrent.ExecutionException;
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import scala.Tuple2;
>
> public class RDDKafkaWriter implements Serializable {
>
>private static final long serialVersionUID = 7374381310562055607L;
>private final KafkaProducerPool pool;
>private final Boolean async;
>
>public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) {
>   this.pool = pool;
>   this.async = async;
>
>}
>
>public void writeToKafka(String topic, Tuple2<String, String> message) {
>   KafkaProducer<String, String> producer = pool.borrowProducer();
>   ProducerRecord<String, String> record = new ProducerRecord<String, 
> String>(
> topic, message._1(), message._2());
>   if (async) {
>  producer.send(record);
>   } else {
>  try {
> producer.send(record).get();
>  } catch (Exception e) {
> e.printStackTrace();
>  }
>   }
>   pool.returnProducer(producer);
>}
>
> public void writeToKafka(String topic, String message) {
>
> KafkaProducer<String, String> producer = pool.borrowProducer();
> ProducerRecord<String, String> record = new ProducerRecord<String, 
> String>(topic, message);
>
> if (async) {
> producer.send(record);
> } else {
> try {
> producer.send(record).get();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> pool.returnProducer(producer);
> }
>
>
> }
>
>
>
>
>
> On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Please also include the patch version, such as 1.6.0, 1.6.1. Could you
>> also post the JAVARDDKafkaWriter codes. It's also possible that it leaks
>> resources.
>>
>> On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora <nipunarora2...@gmail.com>
>> wrote:
>>
>> It is spark 1.6
>>
>> Thanks
>> Nipun
>>
>> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>> Could you provide your Spark version please?
>>
>> On Tue, Jan 31, 2017 

Re: Resource Leak in Spark Streaming

2017-01-31 Thread Shixiong(Ryan) Zhu
Please also include the patch version, such as 1.6.0, 1.6.1. Could you also
post the JAVARDDKafkaWriter codes. It's also possible that it leaks
resources.

On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora <nipunarora2...@gmail.com>
wrote:

> It is spark 1.6
>
> Thanks
> Nipun
>
> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Could you provide your Spark version please?
>>
>> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora <nipunarora2...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I get a resource leak, where the number of file descriptors in spark
>> streaming keeps increasing. We end up with a "too many file open" error
>> eventually through an exception caused in:
>>
>> JAVARDDKafkaWriter, which is writing a spark JavaDStream
>>
>> The exception is attached inline. Any help will be greatly appreciated.
>>
>> Thanks
>> Nipun
>>
>> ---
>> Time: 148576253 ms
>> ---
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
>> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
>> java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-
>> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
>> (too many open files)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.(FileOutputStream.java:221)
>> at org.apache.spark.storage.DiskBlockObjectWriter.open(
>> DiskBlockObjectWriter.scala:88)
>> at org.apache.spark.storage.DiskBlockObjectWriter.write(
>> DiskBlockObjectWriter.scala:181)
>> at org.apache.spark.util.collection.WritablePartitionedPairCollect
>> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
>> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(
>> ExternalSorter.scala:659)
>> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
>> SortShuffleWriter.scala:72)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:73)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1145)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$failJobAndIndependentStages(
>> DAGScheduler.scala:1431)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1419)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1418)
>> at scala.collection.mutable.ResizableArray$class.foreach(
>> ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.scheduler.DAGScheduler.abortStage(
>> DAGScheduler.scala:1418)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at scala.Option.foreach(Option.scala:236)
>> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
>> DAGScheduler.scala:799)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> doOnReceive(DAGScheduler.scala:1640)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1599)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1588)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
>> apply(RDD.scala:920)
>> at org.apache.spark.rdd.RDD$$anonfun$fo

Re: Question about "Output Op Duration" in SparkStreaming Batch details UX

2017-01-31 Thread Shixiong(Ryan) Zhu
It means the total time to run a batch, including the Spark job duration +
time spent on the driver. E.g.,

foreachRDD { rdd =>
   rdd.count() // say this takes 1 second.
   Thread.sleep(1) // sleep 10 seconds
}

In the above example, the Spark job duration is 1 seconds and the output op
duration is 11 seconds.


On Tue, Jan 31, 2017 at 12:56 PM, satishl  wrote:

> For Spark Streaming Apps, what does "Output Op Duration" in the batch
> details
> UX signify?
> We have been observing that - for the given batch's last output Op id -
> Output Op duration > Job duration by a factor. Sometimes it is huge (1
> min).
> I have provided the screenshot below where - you can see that for Output Op
> Id 5, Job Duration is 59ms but output Op duration is 19s. There is no other
> info on where the extra 18.9 seconds were spent.
> On clicking the JobId - there is no breakdown on how this extra time is
> spent.
>
>
>  file/n28354/outputopduration.png>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Question-about-Output-Op-Duration-in-SparkStreaming-
> Batch-details-UX-tp28354.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Resource Leak in Spark Streaming

2017-01-31 Thread Shixiong(Ryan) Zhu
Could you provide your Spark version please?

On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora 
wrote:

> Hi,
>
> I get a resource leak, where the number of file descriptors in spark
> streaming keeps increasing. We end up with a "too many file open" error
> eventually through an exception caused in:
>
> JAVARDDKafkaWriter, which is writing a spark JavaDStream
>
> The exception is attached inline. Any help will be greatly appreciated.
>
> Thanks
> Nipun
>
> ---
> Time: 148576253 ms
> ---
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
> java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-
> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
> (too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)
> at org.apache.spark.storage.DiskBlockObjectWriter.open(
> DiskBlockObjectWriter.scala:88)
> at org.apache.spark.storage.DiskBlockObjectWriter.write(
> DiskBlockObjectWriter.scala:181)
> at org.apache.spark.util.collection.WritablePartitionedPairCollect
> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(
> ExternalSorter.scala:659)
> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
> SortShuffleWriter.scala:72)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:73)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1431)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1419)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1418)
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1418)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:799)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1640)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1599)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
> apply(RDD.scala:920)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
> apply(RDD.scala:918)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:150)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
> at org.apache.spark.api.java.JavaRDDLike$class.
> foreachPartition(JavaRDDLike.scala:225)
> at org.apache.spark.api.java.AbstractJavaRDDLike.
> foreachPartition(JavaRDDLike.scala:46)
> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(
> JavaRDDStringKafkaWriter.java:25)
> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(
> JavaRDDStringKafkaWriter.java:10)
> at org.apache.spark.streaming.api.java.JavaDStreamLike$$
> anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> at org.apache.spark.streaming.api.java.JavaDStreamLike$$
> anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$
> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
> at 

Re: kafka structured streaming source refuses to read

2017-01-27 Thread Shixiong(Ryan) Zhu
Thanks for reporting this. Which Spark version are you using? Could you
provide the full log, please?

On Fri, Jan 27, 2017 at 10:24 AM, Koert Kuipers  wrote:

> i checked my topic. it has 5 partitions but all the data is written to a
> single partition: wikipedia-2
> i turned on debug logging and i see this:
>
> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
> wikipedia-1]. Seeking to the end.
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-0
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-4
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-3
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-2
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-1
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-0 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-0
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-0 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-0
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-4 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-4
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-4 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-4
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-3 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
> successful heartbeat response for group spark-kafka-source-fac4f749-
> fd56-4a32-82c7-e687aadf520b-1923704552-driver-0
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-3
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-3 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-3
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-2 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=152908} for partition wikipedia-2
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-2 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-2
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-1 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-1
> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
> wikipedia-3 -> 0, wikipedia-0 -> 0)
>
> what is confusing to me is this:
> Resetting offset for partition wikipedia-2 to latest offset.
> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0,
> wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)
>
> why does it find latest offset 152908 for wikipedia-2 but then sets latest
> offset to 0 for that partition? or am i misunderstanding?
>
> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers  wrote:
>
>> code:
>>   val query = spark.readStream
>> .format("kafka")
>> .option("kafka.bootstrap.servers", "somenode:9092")
>> .option("subscribe", "wikipedia")
>> .load
>> .select(col("value") cast StringType)
>> .writeStream
>> .format("console")
>> .outputMode(OutputMode.Append)
>> .start()
>>
>>   while (true) {
>> Thread.sleep(1)
>> println(query.lastProgress)
>>   }
>> }
>>
>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman > > wrote:
>>
>>> lets see the code...
>>>
>>> Alonso Isidoro Roman
>>> [image: https://]about.me/alonso.isidoro.roman
>>>
>>> 
>>>
>>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers :
>>>
 my little program prints out query.lastProgress every 10 seconds, and
 this is what it shows:

 {
   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
   "name" : "wiki",
   "timestamp" : 

Re: Using mapWithState without a checkpoint

2017-01-23 Thread Shixiong(Ryan) Zhu
Even if you don't need the checkpointing data for recovery, "mapWithState"
still needs to use "checkpoint" to cut off the RDD lineage.

On Mon, Jan 23, 2017 at 12:30 AM, shyla deshpande 
wrote:

> Hello spark users,
>
> I do have the same question as Daniel.
>
> I would like to save the state in Cassandra and on failure recover using
> the initialState. If some one has already tried this, please share your
> experience and sample code.
>
> Thanks.
>
> On Thu, Nov 17, 2016 at 9:45 AM, Daniel Haviv <
> daniel.ha...@veracity-group.com> wrote:
>
>> Hi,
>> Is it possible to use mapWithState without checkpointing at all ?
>> I'd rather have the whole application fail, restart and reload an
>> initialState RDD than pay for checkpointing every 10 batches.
>>
>> Thank you,
>> Daniel
>>
>
>


Re: Setting startingOffsets to earliest in structured streaming never catches up

2017-01-22 Thread Shixiong(Ryan) Zhu
Which Spark version are you using? If you are using 2.1.0, could you use
the monitoring APIs (
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries)
to check the input rate and the processing rate? One possible issue is that
the Kafka source launched a pretty large batch and it took too long to
finish it. If so, you can use "maxOffsetsPerTrigger" option to limit the
data size in a batch in order to observe the progress.

On Sun, Jan 22, 2017 at 10:22 AM, Timothy Chan  wrote:

> I'm running my structured streaming jobs in EMR. We were thinking a worst
> case scenario recovery situation would be to spin up another cluster and
> set startingOffsets to earliest (our Kafka cluster has a retention policy
> of 7 days).
>
> My observation is that the job never catches up to latest. This is not
> acceptable. I've set the number of partitions for the topic to 6. I've
> tried using a cluster of 4 in EMR.
>
> The producer rate for this topic is 4 events/second. Does anyone have any
> suggestions on what I can do to have my consumer catch up to latest faster?
>


Fwd: Spark streaming app that processes Kafka DStreams produces no output and no error

2017-01-20 Thread Shixiong(Ryan) Zhu
-- Forwarded message --
From: Shixiong(Ryan) Zhu <shixi...@databricks.com>
Date: Fri, Jan 20, 2017 at 12:06 PM
Subject: Re: Spark streaming app that processes Kafka DStreams produces no
output and no error
To: shyla deshpande <deshpandesh...@gmail.com>


That's how KafkaConsumer works right now. It will retry forever for network
errors. See https://issues.apache.org/jira/browse/KAFKA-1894

On Thu, Jan 19, 2017 at 8:16 PM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> There was a issue connecting to Kafka, once that was fixed the spark app
> works.  Hope this helps someone.
> Thanks
>
> On Mon, Jan 16, 2017 at 7:58 AM, shyla deshpande <deshpandesh...@gmail.com
> > wrote:
>
>> Hello,
>> I checked the log file on the worker node and don't see any error there.
>> This is the first time I am asked to run on such a small cluster.  I feel
>> its the resources issue, but it will be great help is somebody can confirm
>> this or share your experience. Thanks
>>
>> On Sat, Jan 14, 2017 at 4:01 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I want to add that,
>>> I don't even see the streaming tab in the application UI on port 4040
>>> when I run it on the cluster.
>>> The cluster on EC2  has 1 master node and 1 worker node.
>>> The cores used on the worker node is 2 of 2 and memory used is 6GB of
>>> 6.3GB.
>>>
>>> Can I run a spark streaming job with just 2 cores?
>>>
>>> Appreciate your time and help.
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jan 13, 2017 at 10:46 PM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> My spark streaming app that reads kafka topics and prints the DStream
>>>> works fine on my laptop, but on AWS cluster it produces no output and no
>>>> errors.
>>>>
>>>> Please help me debug.
>>>>
>>>> I am using Spark 2.0.2 and kafka-0-10
>>>>
>>>> Thanks
>>>>
>>>> The following is the output of the spark streaming app...
>>>>
>>>>
>>>> 17/01/14 06:22:41 WARN NativeCodeLoader: Unable to load native-hadoop 
>>>> library for your platform... using builtin-java classes where applicable
>>>> 17/01/14 06:22:43 WARN Checkpoint: Checkpoint directory check1 does not 
>>>> exist
>>>> Creating new context
>>>> 17/01/14 06:22:45 WARN SparkContext: Use an existing SparkContext, some 
>>>> configuration may not take effect.
>>>> 17/01/14 06:22:45 WARN KafkaUtils: overriding enable.auto.commit to false 
>>>> for executor
>>>> 17/01/14 06:22:45 WARN KafkaUtils: overriding auto.offset.reset to none 
>>>> for executor
>>>> 17/01/14 06:22:45 WARN KafkaUtils: overriding executor group.id to 
>>>> spark-executor-whilDataStream
>>>> 17/01/14 06:22:45 WARN KafkaUtils: overriding receive.buffer.bytes to 
>>>> 65536 see KAFKA-3135
>>>>
>>>>
>>>>
>>>
>>
>


Re: How to access metrics for Structured Streaming 2.1

2017-01-17 Thread Shixiong(Ryan) Zhu
You can use the monitoring APIs of Structured Streaming to get metrics. See
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries

On Tue, Jan 17, 2017 at 5:01 PM, Heji Kim 
wrote:

> Hello. We are trying to migrate and performance test the kafka sink for
> structured streaming in 2.1. Obviously we miss the beautiful Streaming
> Statistics ui tab and we are trying to figure out the most reasonable way
> to monitor event processing rates and lag time.
>
> 1. Are the SourceStatus and SinkStatus mentioned in the "Metrics in
> Structured Streaming" design doc
> 
>  available with the json or jmx metrics? I can't seem to find any of the
> structured streaming metrics in either sink.
>
> 2. RDD Streaming metrics provides total counts such as
>
> totalProcessedRecords, totalReceivedRecords
>
> Is there anything similar in Structured Streaming?
>
> 3. Are there any plans for supporting something like the Streaming
> Statistics tab for structured streaming in the web UI?  Or if we use
> structured streaming, should we be expected to integrate our own  reporting
> server like ganglia?
>
> Any help is greatly appreciated.
>
> Thanks,
> Heji
>
>
>
>


Re: [Spark Streaming] NoClassDefFoundError : StateSpec

2017-01-12 Thread Shixiong(Ryan) Zhu
You can find the Spark version of spark-submit in the log. Could you check
if it's not consistent?
On Thu, Jan 12, 2017 at 7:35 AM Ramkumar Venkataraman <
ram.the.m...@gmail.com> wrote:

> Spark: 1.6.1
>
> I am trying to use the new mapWithState API and I am getting the following
> error:
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/StateSpec$
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.streaming.StateSpec$
>
> Build.sbt
> 
> scalaVersion := "2.10.6"
> typelevelDefaultSettings
> val sparkVersion = "1.6.1"
>
> resolvers ++= Seq(
>   "Sonatype OSS Snapshots" at
> "https://oss.sonatype.org/content/repositories/snapshots;
> )
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>   "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
>   "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion,
>   "com.fasterxml.jackson.core" % "jackson-databind" % "2.3.3" // Needed by
> spark-core
> )
> ==
>
> This is how my spark-submit looks like:
>
> ./bin/spark-submit --verbose --master yarn-client  --num-executors 50
> --driver-memory=4G --executor-memory=8G   --conf
> "spark.driver.extraJavaOptions=-XX:MaxPermSize=6G -XX:+UseConcMarkSweepGC"
> --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -verbose:gc
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"  --class MY_DRIVER
> ~/project-assembly-0.0.1-SNAPSHOT.jar
>
> ==
>
> Is there anything I am missing here? I understand that NoClassDefFoundError
> means the required Jars aren't present in the classpath, I am just not able
> to understand why this class alone is missing, when the others related to
> window, etc. are found. Do I have to pass in additional jars to make this
> API work?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NoClassDefFoundError-StateSpec-tp28301.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: structured streaming polling timeouts

2017-01-11 Thread Shixiong(Ryan) Zhu
No. I think increasing the timeout should work. Spark 2.1.0 changed this
timeout to 120 seconds as we found the default value in 2.0.2 is too small.

On Wed, Jan 11, 2017 at 12:01 PM, Timothy Chan <tc...@lumoslabs.com> wrote:

> We're currently using EMR and they are still on Spark 2.0.2.
>
> Do you have any other suggestions for additional parameters to adjust
> besides "kafkaConsumer.pollTimeoutMs"?
>
> On Wed, Jan 11, 2017 at 11:17 AM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> You can increase the timeout using the option
>> "kafkaConsumer.pollTimeoutMs". In addition, I would recommend you try Spark
>> 2.1.0 as there are many improvements in Structured Streaming.
>>
>> On Wed, Jan 11, 2017 at 11:05 AM, Timothy Chan <tc...@lumoslabs.com>
>> wrote:
>>
>> I'm using Spark 2.0.2 and running a structured streaming query. When I
>> set startingOffsets to earliest I get the following timeout errors:
>>
>> java.lang.AssertionError: assertion failed: Failed to get records for
>> spark-kafka-source-be89d84c-f6e9-4d2b-b6cd-570942dc7d5d-185814897-executor
>> my-favorite-topic-0 1127918 after polling for 2048
>>
>> I do not get these errors when I set startingOffsets to latest.
>>
>>
>>
>>


Re: structured streaming polling timeouts

2017-01-11 Thread Shixiong(Ryan) Zhu
You can increase the timeout using the option
"kafkaConsumer.pollTimeoutMs". In addition, I would recommend you try Spark
2.1.0 as there are many improvements in Structured Streaming.

On Wed, Jan 11, 2017 at 11:05 AM, Timothy Chan  wrote:

> I'm using Spark 2.0.2 and running a structured streaming query. When I set 
> startingOffsets
> to earliest I get the following timeout errors:
>
> java.lang.AssertionError: assertion failed: Failed to get records for
> spark-kafka-source-be89d84c-f6e9-4d2b-b6cd-570942dc7d5d-185814897-executor
> my-favorite-topic-0 1127918 after polling for 2048
>
> I do not get these errors when I set startingOffsets to latest.
>
>
>


Re: Does MapWithState follow with a shuffle ?

2016-11-29 Thread Shixiong(Ryan) Zhu
Right. And you can specify the partitioner via
"StateSpec.partitioner(partitioner: Partitioner)".

On Tue, Nov 29, 2016 at 1:16 PM, Amit Sela  wrote:

> Hi all,
>
> I've been digging into MapWithState code (branch 1.6), and I came across
> the compute
> 
> implementation in *InternalMapWithStateDStream*.
>
> Looking at the defined partitioner
> 
>  it
> looks like it could be different from the parent RDD partitioner (if
> defaultParallelism() changed for instance, or input partitioning was
> smaller to begin with), which will eventually create
> 
> a ShuffleRDD.
>
> Am I reading this right ?
>
> Thanks,
> Amit
>


Re: Pasting into spark-shell doesn't work for Databricks example

2016-11-23 Thread Shixiong(Ryan) Zhu
Scala has not yet resolved this issue. Once they fix and release a new
version, you can just upgrade the Scala version by yourself.

On Tue, Nov 22, 2016 at 10:58 PM, Denis Bolshakov <bolshakov.de...@gmail.com
> wrote:

> Hello Zhu,
>
> Thank you very much for such detailed explanation and providing
> workaround, it works fine.
>
> But since the problem is related to scala issue can we expect the fix in
> Spark 2.0? Or it's not a good idea to update such important dependency as
> scala in minor maintenance release?
>
> Kind regards,
> Denis
>
> On 22 November 2016 at 22:13, Shixiong(Ryan) Zhu <shixi...@databricks.com>
> wrote:
>
>> The workaround is defining the imports and class together using ":paste".
>>
>> On Tue, Nov 22, 2016 at 11:12 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> This relates to a known issue: https://issues.apache.o
>>> rg/jira/browse/SPARK-14146 and https://issues.scala-lang.
>>> org/browse/SI-9799
>>>
>>> On Tue, Nov 22, 2016 at 6:37 AM, dbolshak <bolshakov.de...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> We have the same issue,
>>>>
>>>> We use latest release 2.0.2.
>>>>
>>>> Setup with 1.6.1 works fine.
>>>>
>>>> Could somebody provide a workaround how to fix that?
>>>>
>>>> Kind regards,
>>>> Denis
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-spark-user-list.
>>>> 1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work-
>>>> for-Databricks-example-tp28113p28116.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>
>
>
> --
> //with Best Regards
> --Denis Bolshakov
> e-mail: bolshakov.de...@gmail.com
>


Re: getting error on spark streaming : java.lang.OutOfMemoryError: unable to create new native thread

2016-11-22 Thread Shixiong(Ryan) Zhu
Possibly https://issues.apache.org/jira/browse/SPARK-17396

On Tue, Nov 22, 2016 at 1:42 PM, Mohit Durgapal 
wrote:

> Hi Everyone,
>
>
> I am getting the following error while running a spark streaming example
> on my local machine, the being ingested is only 506kb.
>
>
> *16/11/23 03:05:54 INFO MappedDStream: Slicing from 1479850537180 ms to
> 1479850537235 ms (aligned to 1479850537180 ms and 1479850537235 ms)*
>
> *Exception in thread "streaming-job-executor-0"
> java.lang.OutOfMemoryError: unable to create new native thread*
>
>
> I looked it up and found out that it could be related to ulimit, I even
> increased the ulimit to 1 but still the same error.
>
>
> Regards
>
> Mohit
>


Re: Pasting into spark-shell doesn't work for Databricks example

2016-11-22 Thread Shixiong(Ryan) Zhu
The workaround is defining the imports and class together using ":paste".

On Tue, Nov 22, 2016 at 11:12 AM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> This relates to a known issue: https://issues.apache.
> org/jira/browse/SPARK-14146 and https://issues.scala-lang.
> org/browse/SI-9799
>
> On Tue, Nov 22, 2016 at 6:37 AM, dbolshak <bolshakov.de...@gmail.com>
> wrote:
>
>> Hello,
>>
>> We have the same issue,
>>
>> We use latest release 2.0.2.
>>
>> Setup with 1.6.1 works fine.
>>
>> Could somebody provide a workaround how to fix that?
>>
>> Kind regards,
>> Denis
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work-
>> for-Databricks-example-tp28113p28116.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Pasting into spark-shell doesn't work for Databricks example

2016-11-22 Thread Shixiong(Ryan) Zhu
This relates to a known issue:
https://issues.apache.org/jira/browse/SPARK-14146 and
https://issues.scala-lang.org/browse/SI-9799

On Tue, Nov 22, 2016 at 6:37 AM, dbolshak  wrote:

> Hello,
>
> We have the same issue,
>
> We use latest release 2.0.2.
>
> Setup with 1.6.1 works fine.
>
> Could somebody provide a workaround how to fix that?
>
> Kind regards,
> Denis
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work-
> for-Databricks-example-tp28113p28116.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: HiveContext.getOrCreate not accessible

2016-11-17 Thread Shixiong(Ryan) Zhu
`SQLContext.getOrCreate` will return the HiveContext you created.

On Mon, Nov 14, 2016 at 11:17 PM, Praseetha  wrote:

>
> Hi All,
>
>
> I have a streaming app and when i try invoking the
> HiveContext.getOrCreate, it errors out with the following stmt. 'object
> HiveContext in package hive cannot be accessed in package
> org.apache.spark.sql.hive'
>
> I would require HiveContext instead of SQLContext for my application and
> creating new HiveContext everytime would not be a feasible solution.
>
> Here is my code snippet:
> object sampleStreamingApp  {
>
>   def createStreamingContext(checkpointDirectory: String):
> StreamingContext = {
> val conf = new SparkConf().setAppName("sampleStreaming")
> val sc = new SparkContext(conf)
> val ssc = new StreamingContext(sc, Milliseconds(5000))
> ssc.checkpoint(checkpointDirectory)
> val smDStream = ssc.textFileStream("/user/hdpuser/data")
> val smSplitted = smDStream.map( x => x.split(";") ).map( x => Row.fromSeq(
> x ) )
> smSplitted.foreachRDD { rdd =>
>  val sqlContext = HiveContext.getOrCreate(rdd.
> sparkContext)
> import sqlContext.implicits._
> 
> }
> }
> ssc
>   }
>
>   def main(args: Array[String]) {
>   val checkpointDirectory = "hdfs://localhost:8020/user/
> dfml/checkpointing/AAA"
>   val ssc = StreamingContext.getActiveOrCreate(checkpointDirectory, () =>
> createStreamingContext(checkpointDirectory))
>
>   ssc.start()
>   ssc.awaitTermination()
>   }
> }
>
> Any help would be appreciated.
>
> Regds,
> --Praseetha
>


Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-17 Thread Shixiong(Ryan) Zhu
ptor does 
>> not match message type.")
>> __field.getNumber match {
>>   case 3 => com.example.protos.demo.Gender
>> }
>>   }
>>   lazy val defaultInstance = com.example.protos.demo.Person(
>>   )
>>   implicit class PersonLens[UpperPB](_l: com.trueaccord.lenses.Lens[UpperPB, 
>> com.example.protos.demo.Person]) extends 
>> com.trueaccord.lenses.ObjectLens[UpperPB, 
>> com.example.protos.demo.Person](_l) {
>> def name: com.trueaccord.lenses.Lens[UpperPB, String] = 
>> field(_.getName)((c_, f_) => c_.copy(name = Some(f_)))
>> def optionalName: com.trueaccord.lenses.Lens[UpperPB, 
>> scala.Option[String]] = field(_.name)((c_, f_) => c_.copy(name = f_))
>> def age: com.trueaccord.lenses.Lens[UpperPB, Int] = field(_.getAge)((c_, 
>> f_) => c_.copy(age = Some(f_)))
>> def optionalAge: com.trueaccord.lenses.Lens[UpperPB, scala.Option[Int]] 
>> = field(_.age)((c_, f_) => c_.copy(age = f_))
>> def gender: com.trueaccord.lenses.Lens[UpperPB, 
>> com.example.protos.demo.Gender] = field(_.getGender)((c_, f_) => 
>> c_.copy(gender = Some(f_)))
>> def optionalGender: com.trueaccord.lenses.Lens[UpperPB, 
>> scala.Option[com.example.protos.demo.Gender]] = field(_.gender)((c_, f_) => 
>> c_.copy(gender = f_))
>> def tags: com.trueaccord.lenses.Lens[UpperPB, 
>> scala.collection.Seq[String]] = field(_.tags)((c_, f_) => c_.copy(tags = f_))
>> def addresses: com.trueaccord.lenses.Lens[UpperPB, 
>> scala.collection.Seq[com.example.protos.demo.Address]] = 
>> field(_.addresses)((c_, f_) => c_.copy(addresses = f_))
>>   }
>>   final val NAME_FIELD_NUMBER = 1
>>   final val AGE_FIELD_NUMBER = 2
>>   final val GENDER_FIELD_NUMBER = 3
>>   final val TAGS_FIELD_NUMBER = 4
>>   final val ADDRESSES_FIELD_NUMBER = 5
>> }
>>
>>
>> On Wed, Nov 16, 2016 at 1:28 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Could you provide the Person class?
>>>
>>> On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> I am using 2.11.8. Thanks
>>>>
>>>> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <
>>>> shixi...@databricks.com> wrote:
>>>>
>>>>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has
>>>>> some known race conditions in reflection and the Scala community doesn't
>>>>> have plan to fix it (http://docs.scala-lang.org/ov
>>>>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it
>>>>> is upgrading to Scala 2.11.
>>>>>
>>>>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
>>>>> deshpandesh...@gmail.com> wrote:
>>>>>
>>>>>> I am using protobuf to encode. This may not be related to the new
>>>>>> release issue
>>>>>>
>>>>>> Exception in thread "main" scala.ScalaReflectionException:  is
>>>>>> not a term
>>>>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.sca
>>>>>> la:199)
>>>>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S
>>>>>> ymbols.scala:84)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>>>>>> tParams(ScalaReflection.scala:811)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>>>>>> ms(ScalaReflection.scala:39)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>>>>>> ructorParameters(ScalaReflection.scala:800)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>>>>>> rParameters(ScalaReflection.scala:39)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>> ion.scala:582)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>>>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>>>>> ion.scala:460)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>>>> ly(ScalaReflection.scala:592)
>>>>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>>>>> ly(ScalaReflection.scala:583)
>>>&

Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-16 Thread Shixiong(Ryan) Zhu
Could you provide the Person class?

On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> I am using 2.11.8. Thanks
>
> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
>> known race conditions in reflection and the Scala community doesn't have
>> plan to fix it (http://docs.scala-lang.org/ov
>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it is
>> upgrading to Scala 2.11.
>>
>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> I am using protobuf to encode. This may not be related to the new
>>> release issue
>>>
>>> Exception in thread "main" scala.ScalaReflectionException:  is
>>> not a term
>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S
>>> ymbols.scala:84)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>>> tParams(ScalaReflection.scala:811)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>>> ms(ScalaReflection.scala:39)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>>> ructorParameters(ScalaReflection.scala:800)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>>> rParameters(ScalaReflection.scala:39)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>> ion.scala:582)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>> ion.scala:460)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>> ly(ScalaReflection.scala:592)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>> ly(ScalaReflection.scala:583)
>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>> aversableLike.scala:252)
>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>> aversableLike.scala:252)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>>> ke.scala:252)
>>> at scala.collection.immutable.List.flatMap(List.scala:344)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>> ion.scala:583)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>>> (ScalaReflection.scala:425)
>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>>> ply(ExpressionEncoder.scala:61)
>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>>> cits.scala:47)
>>> at PersonConsumer$.main(PersonConsumer.scala:33)
>>> at PersonConsumer.main(PersonConsumer.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>>
>>> The following is my code ...
>>>
>>> object PersonConsumer {
>>>   import org.apache.spark.rdd.RDD
>>>   import com.trueaccord.scalapb.spark._
>>>   import org.apache.spark.sql.{SQLContext, SparkSession}
>>>   import com.example.protos.demo._
>>>
>>>   def main(args : Array[String]) {
>>>
>>> def parseLine(s: String): Person =
>>>   Person.parseFrom(
>>> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>>
>>> val spark = SparkSession.builder.
>>>   master("local")
>>>   .appName("spark session example")
>>>   .getOrCreate()
>>>
>>> import spark.implicits._
>>>
>>> val ds1 = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>>
>>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>>
>>> val ds3 = ds2.map(str => 
>>> parseLine(str)).createOrReplaceTempView("persons")
>>>
>>> val ds4 = spark.sqlContext.sql("select name from persons")
>>>
>>> val query = ds4.writeStream
>>>   .outputMode("append")
>>>   .format("console")
>>>   .start()
>>> query.awaitTermination()
>>>   }
>>> }
>>>
>>>
>>
>


Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-16 Thread Shixiong(Ryan) Zhu
Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
known race conditions in reflection and the Scala community doesn't have
plan to fix it (
http://docs.scala-lang.org/overviews/reflection/thread-safety.html) AFAIK,
the only way to fix it is upgrading to Scala 2.11.

On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande 
wrote:

> I am using protobuf to encode. This may not be related to the new release
> issue
>
> Exception in thread "main" scala.ScalaReflectionException:  is not
> a term
> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
> at scala.reflect.internal.Symbols$SymbolContextApiImpl.
> asTerm(Symbols.scala:84)
> at org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(
> ScalaReflection.scala:811)
> at org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(
> ScalaReflection.scala:39)
> at org.apache.spark.sql.catalyst.ScalaReflection$class.
> getConstructorParameters(ScalaReflection.scala:800)
> at org.apache.spark.sql.catalyst.ScalaReflection$.
> getConstructorParameters(ScalaReflection.scala:39)
> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$
> spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.
> scala:582)
> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$
> spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.
> scala:460)
> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.
> apply(ScalaReflection.scala:592)
> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.
> apply(ScalaReflection.scala:583)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:252)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:252)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.flatMap(
> TraversableLike.scala:252)
> at scala.collection.immutable.List.flatMap(List.scala:344)
> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$
> spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.
> scala:583)
> at org.apache.spark.sql.catalyst.ScalaReflection$.
> serializerFor(ScalaReflection.scala:425)
> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.
> apply(ExpressionEncoder.scala:61)
> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
> at org.apache.spark.sql.SQLImplicits.newProductEncoder(
> SQLImplicits.scala:47)
> at PersonConsumer$.main(PersonConsumer.scala:33)
> at PersonConsumer.main(PersonConsumer.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>
> The following is my code ...
>
> object PersonConsumer {
>   import org.apache.spark.rdd.RDD
>   import com.trueaccord.scalapb.spark._
>   import org.apache.spark.sql.{SQLContext, SparkSession}
>   import com.example.protos.demo._
>
>   def main(args : Array[String]) {
>
> def parseLine(s: String): Person =
>   Person.parseFrom(
> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>
> val spark = SparkSession.builder.
>   master("local")
>   .appName("spark session example")
>   .getOrCreate()
>
> import spark.implicits._
>
> val ds1 = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>
> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>
> val ds3 = ds2.map(str => 
> parseLine(str)).createOrReplaceTempView("persons")
>
> val ds4 = spark.sqlContext.sql("select name from persons")
>
> val query = ds4.writeStream
>   .outputMode("append")
>   .format("console")
>   .start()
> query.awaitTermination()
>   }
> }
>
>


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-10 Thread Shixiong(Ryan) Zhu
Yeah, the KafkaRDD cannot be reused. It's better to document it.

On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy  wrote:

> Ok, I have split he KafkaRDD logic to each use their own group and bumped
> the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms
> ends up with a timeout and exception so I am still perplexed on that one.
> The new error I am getting now is a `ConcurrentModificationException`
> when Spark is trying to remove the CachedKafkaConsumer.
>
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> at org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1361)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>
> Here is the basic logic:
>
> *Using KafkaRDD* - This takes a list of channels and processes them in
> parallel using the KafkaRDD directly. They each use a distinct consumer
> group (s"$prefix-$topic"), and each has it's own topic and each topic has
> 4 partitions. We routinely get timeout errors when polling for data when
> the poll.ms is less then 2 seconds. This occurs whether we process in
> parallel.
>
> *Example usage with KafkaRDD:*
> val channels = Seq("channel1", "channel2")
>
> channels.toParArray.foreach { channel =>
>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>
>   // Get offsets for the given topic and the consumer group "$prefix-$
> topic"
>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>
>   val ds = KafkaUtils.createRDD[K, V](context,
> kafkaParams asJava,
> offsetRanges,
> PreferConsistent).toDS[V]
>
>   // Do some aggregations
>   ds.agg(...)
>   // Save the data
>   ds.write.mode(SaveMode.Append).parquet(somePath)
>   // Save offsets using a KafkaConsumer
>   consumer.commitSync(newOffsets.asJava)
>   consumer.close()
> }
>
> I am not sure why the concurrent issue is there as I have tried to debug
> and also looked at the KafkaConsumer code as well, but everything looks
> like it should not occur. The things to figure out is why when running in
> parallel does this occur and also why the timeouts still occur.
>
> Thanks,
>
> Ivan
>
> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger 
> wrote:
>
>> There definitely is Kafka documentation indicating that you should use
>> a different consumer group for logically different subscribers, this
>> is really basic to Kafka:
>>
>> http://kafka.apache.org/documentation#intro_consumers
>>
>> As for your comment that "commit async after each RDD, which is not
>> really viable also", how is it not viable?  Again, committing offsets
>> to Kafka doesn't give you reliable delivery semantics unless your
>> downstream data store is idempotent.  If your downstream data store is
>> idempotent, then it shouldn't matter to you when offset commits
>> happen, as long as they happen within a reasonable time after the data
>> is written.
>>
>> Do you want to keep arguing with me, or follow my advice and proceed
>> with debugging any remaining issues after you make the changes I
>> suggested?
>>
>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy  wrote:
>> > With our stream version, we update the offsets for only the partition we
>> > operating on. We even break down the partition into smaller batches and
>> then
>> > update the offsets after each batch within the partition. With Spark
>> 1.6 and
>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
>> > necessarily a Spark issue since Kafka no longer allows you to simply
>> update
>> > the offsets for a given consumer group. You have to subscribe or assign
>> > partitions to even do so.
>> >
>> > As for storing the offsets in some other place like a DB, it don't find
>> this
>> > useful because you then can't use tools like Kafka Manager. In order to
>> do
>> > so you would have to store in a DB and the circle back and update Kafka
>> > afterwards. This means you have to keep two sources in sync which is not
>> > really a good idea.
>> >
>> > It is a challenge in Spark to use the Kafka offsets since the drive
>> keeps
>> > subscribed to the topic(s) and consumer group, while the executors
>> prepend
>> > "spark-executor-" to the consumer group. The stream (driver) does allow
>> you
>> > to commit async after each RDD, which is not really viable also. I have
>> not
>> > of implementing an Akka actor system on the driver and send it messages
>> from
>> > the executor code to update the offsets, but then that is asynchronous
>> as
>> > well so not really a good solution.
>> >
>> > I have no idea why Kafka made this change and also why in the parallel
>> > KafkaRDD application we would be advised to use different consumer
>> groups
>> > for each RDD. That 

Re: why spark driver program is creating so many threads? How can I limit this number?

2016-11-01 Thread Shixiong(Ryan) Zhu
Yes, try 2.0.1!

On Tue, Nov 1, 2016 at 11:25 AM, kant kodali <kanth...@gmail.com> wrote:

> AH!!! Got it! Should I use 2.0.1 then ? I don't see 2.1.0
>
> On Tue, Nov 1, 2016 at 10:14 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Dstream "Window" uses "union" to combine multiple RDDs in one window into
>> a single RDD.
>>
>> On Tue, Nov 1, 2016 at 2:59 AM kant kodali <kanth...@gmail.com> wrote:
>>
>>> @Sean It looks like this problem can happen with other RDD's as well.
>>> Not just unionRDD
>>>
>>> On Tue, Nov 1, 2016 at 2:52 AM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>> Hi Sean,
>>>
>>> The comments seem very relevant although I am not sure if this pull
>>> request https://github.com/apache/spark/pull/14985 would fix my issue?
>>> I am not sure what unionRDD.scala has anything to do with my error (I don't
>>> know much about spark code base). Do I ever use unionRDD.scala when I call
>>> mapToPair or ReduceByKey or forEachRDD?  This error is very easy to
>>> reproduce you actually don't need to ingest any data to spark streaming
>>> job. Just have one simple transformation consists of mapToPair, reduceByKey
>>> and forEachRDD and have the window interval of 1min and batch interval of
>>> one one second and simple call ssc.awaitTermination() and watch the
>>> Thread Count go up significantly.
>>>
>>> I do think that using a fixed size executor service would probably be a
>>> safer approach. One could leverage ForJoinPool if they think they could
>>> benefit a lot from the work-steal algorithm and doubly ended queues in the
>>> ForkJoinPool.
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>> On Tue, Nov 1, 2016 at 2:19 AM, Sean Owen <so...@cloudera.com> wrote:
>>>
>>> Possibly https://issues.apache.org/jira/browse/SPARK-17396 ?
>>>
>>> On Tue, Nov 1, 2016 at 2:11 AM kant kodali <kanth...@gmail.com> wrote:
>>>
>>> Hi Ryan,
>>>
>>> I think you are right. This may not be related to the Receiver. I have
>>> attached jstack dump here. I do a simple MapToPair and reduceByKey and  I
>>> have a window Interval of 1 minute (6ms) and batch interval of 1s (
>>> 1000) This is generating lot of threads atleast 5 to 8 threads per
>>> second and the total number of threads is monotonically increasing. So just
>>> for tweaking purpose I changed my window interval to 1min (6ms) and
>>> batch interval of 10s (1) this looked lot better but still not
>>> ideal at very least it is not monotonic anymore (It goes up and down). Now
>>> my question  really is how do I tune such that my number of threads are
>>> optimal while satisfying the window Interval of 1 minute (6ms) and
>>> batch interval of 1s (1000) ?
>>>
>>> This jstack dump is taken after running my spark driver program for 2
>>> mins and there are about 1000 threads.
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>


Re: why spark driver program is creating so many threads? How can I limit this number?

2016-11-01 Thread Shixiong(Ryan) Zhu
Dstream "Window" uses "union" to combine multiple RDDs in one window into a
single RDD.
On Tue, Nov 1, 2016 at 2:59 AM kant kodali  wrote:

> @Sean It looks like this problem can happen with other RDD's as well. Not
> just unionRDD
>
> On Tue, Nov 1, 2016 at 2:52 AM, kant kodali  wrote:
>
> Hi Sean,
>
> The comments seem very relevant although I am not sure if this pull
> request https://github.com/apache/spark/pull/14985 would fix my issue? I
> am not sure what unionRDD.scala has anything to do with my error (I don't
> know much about spark code base). Do I ever use unionRDD.scala when I call
> mapToPair or ReduceByKey or forEachRDD?  This error is very easy to
> reproduce you actually don't need to ingest any data to spark streaming
> job. Just have one simple transformation consists of mapToPair, reduceByKey
> and forEachRDD and have the window interval of 1min and batch interval of
> one one second and simple call ssc.awaitTermination() and watch the
> Thread Count go up significantly.
>
> I do think that using a fixed size executor service would probably be a
> safer approach. One could leverage ForJoinPool if they think they could
> benefit a lot from the work-steal algorithm and doubly ended queues in the
> ForkJoinPool.
>
> Thanks!
>
>
>
>
> On Tue, Nov 1, 2016 at 2:19 AM, Sean Owen  wrote:
>
> Possibly https://issues.apache.org/jira/browse/SPARK-17396 ?
>
> On Tue, Nov 1, 2016 at 2:11 AM kant kodali  wrote:
>
> Hi Ryan,
>
> I think you are right. This may not be related to the Receiver. I have
> attached jstack dump here. I do a simple MapToPair and reduceByKey and  I
> have a window Interval of 1 minute (6ms) and batch interval of 1s (
> 1000) This is generating lot of threads atleast 5 to 8 threads per second
> and the total number of threads is monotonically increasing. So just for
> tweaking purpose I changed my window interval to 1min (6ms) and batch
> interval of 10s (1) this looked lot better but still not ideal at
> very least it is not monotonic anymore (It goes up and down). Now my
> question  really is how do I tune such that my number of threads are
> optimal while satisfying the window Interval of 1 minute (6ms) and
> batch interval of 1s (1000) ?
>
> This jstack dump is taken after running my spark driver program for 2 mins
> and there are about 1000 threads.
>
> Thanks!
>
>
>
>


Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread Shixiong(Ryan) Zhu
If there is some leaking threads, I think you should be able to see the
number of threads is increasing. You can just dump threads after 1-2 hours.

On Mon, Oct 31, 2016 at 12:59 PM, kant kodali <kanth...@gmail.com> wrote:

> yes I can certainly use jstack but it requires 4 to 5 hours for me to
> reproduce the error so I can get back as early as possible.
>
> Thanks a lot!
>
> On Mon, Oct 31, 2016 at 12:41 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Then it should not be a Receiver issue. Could you use `jstack` to find
>> out the name of leaking threads?
>>
>> On Mon, Oct 31, 2016 at 12:35 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi Ryan,
>>>
>>> It happens on the driver side and I am running on a client mode (not the
>>> cluster mode).
>>>
>>> Thanks!
>>>
>>> On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
>>>> Sorry, there is a typo in my previous email: this may **not** be the
>>>> root cause if the leak threads are in the driver side.
>>>>
>>>> Does it happen in the driver or executors?
>>>>
>>>> On Mon, Oct 31, 2016 at 12:20 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ryan,
>>>>>
>>>>> Ahh My Receiver.onStop method is currently empty.
>>>>>
>>>>> 1) I have a hard time seeing why the receiver would crash so many times 
>>>>> within a span of 4 to 5 hours but anyways I understand I should still 
>>>>> cleanup during OnStop.
>>>>>
>>>>> 2) How do I clean up those threads? The documentation here 
>>>>> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't 
>>>>> seem to have any method where I can clean up the threads created during 
>>>>> OnStart. any ideas?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
>>>>> shixi...@databricks.com> wrote:
>>>>>
>>>>>> So in your code, each Receiver will start a new thread. Did you stop
>>>>>> the receiver properly in `Receiver.onStop`? Otherwise, you may leak 
>>>>>> threads
>>>>>> after a receiver crashes and is restarted by Spark. However, this may be
>>>>>> the root cause since the leak threads are in the driver side. Could you 
>>>>>> use
>>>>>> `jstack` to check which types of threads are leaking?
>>>>>>
>>>>>> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali <kanth...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I am also under the assumption that *onStart *function of the
>>>>>>> Receiver is only called only once by Spark. please correct me if I
>>>>>>> am wrong.
>>>>>>>
>>>>>>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> My driver program runs a spark streaming job.  And it spawns a
>>>>>>>> thread by itself only in the *onStart()* function below Other than
>>>>>>>> that it doesn't spawn any other threads. It only calls MapToPair,
>>>>>>>> ReduceByKey, forEachRDD, Collect functions.
>>>>>>>>
>>>>>>>> public class NSQReceiver extends Receiver {
>>>>>>>>
>>>>>>>> private String topic="";
>>>>>>>>
>>>>>>>> public NSQReceiver(String topic) {
>>>>>>>> super(StorageLevel.MEMORY_AND_DISK_2());
>>>>>>>> this.topic = topic;
>>>>>>>> }
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public void *onStart()* {
>>>>>>>> new Thread()  {
>>>>>>>> @Override public void run() {
>>>>>>>> receive();
>>>>>>>> }
>>>>>>>> }.start();
>>>>>>>> }
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>

Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread Shixiong(Ryan) Zhu
Then it should not be a Receiver issue. Could you use `jstack` to find out
the name of leaking threads?

On Mon, Oct 31, 2016 at 12:35 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi Ryan,
>
> It happens on the driver side and I am running on a client mode (not the
> cluster mode).
>
> Thanks!
>
> On Mon, Oct 31, 2016 at 12:32 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Sorry, there is a typo in my previous email: this may **not** be the
>> root cause if the leak threads are in the driver side.
>>
>> Does it happen in the driver or executors?
>>
>> On Mon, Oct 31, 2016 at 12:20 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi Ryan,
>>>
>>> Ahh My Receiver.onStop method is currently empty.
>>>
>>> 1) I have a hard time seeing why the receiver would crash so many times 
>>> within a span of 4 to 5 hours but anyways I understand I should still 
>>> cleanup during OnStop.
>>>
>>> 2) How do I clean up those threads? The documentation here 
>>> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't 
>>> seem to have any method where I can clean up the threads created during 
>>> OnStart. any ideas?
>>>
>>> Thanks!
>>>
>>>
>>> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
>>>> So in your code, each Receiver will start a new thread. Did you stop
>>>> the receiver properly in `Receiver.onStop`? Otherwise, you may leak threads
>>>> after a receiver crashes and is restarted by Spark. However, this may be
>>>> the root cause since the leak threads are in the driver side. Could you use
>>>> `jstack` to check which types of threads are leaking?
>>>>
>>>> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> I am also under the assumption that *onStart *function of the Receiver
>>>>>  is only called only once by Spark. please correct me if I am wrong.
>>>>>
>>>>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> My driver program runs a spark streaming job.  And it spawns a thread
>>>>>> by itself only in the *onStart()* function below Other than that it
>>>>>> doesn't spawn any other threads. It only calls MapToPair, ReduceByKey,
>>>>>> forEachRDD, Collect functions.
>>>>>>
>>>>>> public class NSQReceiver extends Receiver {
>>>>>>
>>>>>> private String topic="";
>>>>>>
>>>>>> public NSQReceiver(String topic) {
>>>>>> super(StorageLevel.MEMORY_AND_DISK_2());
>>>>>> this.topic = topic;
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public void *onStart()* {
>>>>>> new Thread()  {
>>>>>> @Override public void run() {
>>>>>> receive();
>>>>>> }
>>>>>> }.start();
>>>>>> }
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Environment info:
>>>>>>
>>>>>> Java 8
>>>>>>
>>>>>> Scala 2.11.8
>>>>>>
>>>>>> Spark 2.0.0
>>>>>>
>>>>>> More than happy to share any other info you may need.
>>>>>>
>>>>>>
>>>>>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky <ja...@odersky.com>
>>>>>> wrote:
>>>>>>
>>>>>>>  > how do I tell my spark driver program to not create so many?
>>>>>>>
>>>>>>> This may depend on your driver program. Do you spawn any threads in
>>>>>>> it? Could you share some more information on the driver program,
>>>>>>> spark
>>>>>>> version and your environment? It would greatly help others to help
>>>>>>> you
>>>>>>>
>>>>>>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali <kanth...@gmail.com>
>>>>>>> wrote:
>>>>>>> > The source of my problem is

Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread Shixiong(Ryan) Zhu
Sorry, there is a typo in my previous email: this may **not** be the root
cause if the leak threads are in the driver side.

Does it happen in the driver or executors?

On Mon, Oct 31, 2016 at 12:20 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi Ryan,
>
> Ahh My Receiver.onStop method is currently empty.
>
> 1) I have a hard time seeing why the receiver would crash so many times 
> within a span of 4 to 5 hours but anyways I understand I should still cleanup 
> during OnStop.
>
> 2) How do I clean up those threads? The documentation here 
> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't seem 
> to have any method where I can clean up the threads created during OnStart. 
> any ideas?
>
> Thanks!
>
>
> On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> So in your code, each Receiver will start a new thread. Did you stop the
>> receiver properly in `Receiver.onStop`? Otherwise, you may leak threads
>> after a receiver crashes and is restarted by Spark. However, this may be
>> the root cause since the leak threads are in the driver side. Could you use
>> `jstack` to check which types of threads are leaking?
>>
>> On Mon, Oct 31, 2016 at 11:50 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> I am also under the assumption that *onStart *function of the Receiver is
>>> only called only once by Spark. please correct me if I am wrong.
>>>
>>> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth...@gmail.com>
>>> wrote:
>>>
>>>> My driver program runs a spark streaming job.  And it spawns a thread
>>>> by itself only in the *onStart()* function below Other than that it
>>>> doesn't spawn any other threads. It only calls MapToPair, ReduceByKey,
>>>> forEachRDD, Collect functions.
>>>>
>>>> public class NSQReceiver extends Receiver {
>>>>
>>>> private String topic="";
>>>>
>>>> public NSQReceiver(String topic) {
>>>> super(StorageLevel.MEMORY_AND_DISK_2());
>>>> this.topic = topic;
>>>> }
>>>>
>>>> @Override
>>>> public void *onStart()* {
>>>> new Thread()  {
>>>> @Override public void run() {
>>>> receive();
>>>> }
>>>> }.start();
>>>> }
>>>>
>>>> }
>>>>
>>>>
>>>> Environment info:
>>>>
>>>> Java 8
>>>>
>>>> Scala 2.11.8
>>>>
>>>> Spark 2.0.0
>>>>
>>>> More than happy to share any other info you may need.
>>>>
>>>>
>>>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky <ja...@odersky.com>
>>>> wrote:
>>>>
>>>>>  > how do I tell my spark driver program to not create so many?
>>>>>
>>>>> This may depend on your driver program. Do you spawn any threads in
>>>>> it? Could you share some more information on the driver program, spark
>>>>> version and your environment? It would greatly help others to help you
>>>>>
>>>>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>> > The source of my problem is actually that I am running into the
>>>>> following
>>>>> > error. This error seems to happen after running my driver program
>>>>> for 4
>>>>> > hours.
>>>>> >
>>>>> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
>>>>> > "dag-scheduler-event-loop" Exception in thread
>>>>> "ForkJoinPool-50-worker-13"
>>>>> > java.lang.OutOfMemoryError: unable to create new native thread"
>>>>> >
>>>>> > and this wonderful book taught me that the error "unable to create
>>>>> new
>>>>> > native thread" can happen because JVM is trying to request the OS
>>>>> for a
>>>>> > thread and it is refusing to do so for the following reasons
>>>>> >
>>>>> > 1. The system has actually run out of virtual memory.
>>>>> > 2. On Unix-style systems, the user has already created (between all
>>>>> programs
>>>>> > user is running) th

Re: why spark driver program is creating so many threads? How can I limit this number?

2016-10-31 Thread Shixiong(Ryan) Zhu
So in your code, each Receiver will start a new thread. Did you stop the
receiver properly in `Receiver.onStop`? Otherwise, you may leak threads
after a receiver crashes and is restarted by Spark. However, this may be
the root cause since the leak threads are in the driver side. Could you use
`jstack` to check which types of threads are leaking?

On Mon, Oct 31, 2016 at 11:50 AM, kant kodali  wrote:

> I am also under the assumption that *onStart *function of the Receiver is
> only called only once by Spark. please correct me if I am wrong.
>
> On Mon, Oct 31, 2016 at 11:35 AM, kant kodali  wrote:
>
>> My driver program runs a spark streaming job.  And it spawns a thread by
>> itself only in the *onStart()* function below Other than that it doesn't
>> spawn any other threads. It only calls MapToPair, ReduceByKey, forEachRDD,
>> Collect functions.
>>
>> public class NSQReceiver extends Receiver {
>>
>> private String topic="";
>>
>> public NSQReceiver(String topic) {
>> super(StorageLevel.MEMORY_AND_DISK_2());
>> this.topic = topic;
>> }
>>
>> @Override
>> public void *onStart()* {
>> new Thread()  {
>> @Override public void run() {
>> receive();
>> }
>> }.start();
>> }
>>
>> }
>>
>>
>> Environment info:
>>
>> Java 8
>>
>> Scala 2.11.8
>>
>> Spark 2.0.0
>>
>> More than happy to share any other info you may need.
>>
>>
>> On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky 
>> wrote:
>>
>>>  > how do I tell my spark driver program to not create so many?
>>>
>>> This may depend on your driver program. Do you spawn any threads in
>>> it? Could you share some more information on the driver program, spark
>>> version and your environment? It would greatly help others to help you
>>>
>>> On Mon, Oct 31, 2016 at 3:47 AM, kant kodali  wrote:
>>> > The source of my problem is actually that I am running into the
>>> following
>>> > error. This error seems to happen after running my driver program for 4
>>> > hours.
>>> >
>>> > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread
>>> > "dag-scheduler-event-loop" Exception in thread
>>> "ForkJoinPool-50-worker-13"
>>> > java.lang.OutOfMemoryError: unable to create new native thread"
>>> >
>>> > and this wonderful book taught me that the error "unable to create new
>>> > native thread" can happen because JVM is trying to request the OS for a
>>> > thread and it is refusing to do so for the following reasons
>>> >
>>> > 1. The system has actually run out of virtual memory.
>>> > 2. On Unix-style systems, the user has already created (between all
>>> programs
>>> > user is running) the maximum number of processes configured for that
>>> user
>>> > login. Individual threads are considered a process in that regard.
>>> >
>>> > Option #2 is ruled out in my case because my driver programing is
>>> running
>>> > with a userid of root which has  maximum number of processes set to
>>> 120242
>>> >
>>> > ulimit -a gives me the following
>>> >
>>> > core file size  (blocks, -c) 0
>>> > data seg size   (kbytes, -d) unlimited
>>> > scheduling priority (-e) 0
>>> > file size   (blocks, -f) unlimited
>>> > pending signals (-i) 120242
>>> > max locked memory   (kbytes, -l) 64
>>> > max memory size (kbytes, -m) unlimited
>>> > open files  (-n) 1024
>>> > pipe size(512 bytes, -p) 8
>>> > POSIX message queues (bytes, -q) 819200
>>> > real-time priority  (-r) 0
>>> > stack size  (kbytes, -s) 8192
>>> > cpu time   (seconds, -t) unlimited
>>> > max user processes  (-u) 120242
>>> > virtual memory  (kbytes, -v) unlimited
>>> > file locks  (-x) unlimited
>>> >
>>> > So at this point I do understand that the I am running out of memory
>>> due to
>>> > allocation of threads so my biggest question is how do I tell my spark
>>> > driver program to not create so many?
>>> >
>>> > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen  wrote:
>>> >>
>>> >> ps -L [pid] is what shows threads. I am not sure this is counting
>>> what you
>>> >> think it does. My shell process has about a hundred threads, and I
>>> can't
>>> >> imagine why one would have thousands unless your app spawned them.
>>> >>
>>> >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali 
>>> wrote:
>>> >>>
>>> >>> when I do
>>> >>>
>>> >>> ps -elfT | grep "spark-driver-program.jar" | wc -l
>>> >>>
>>> >>> The result is around 32K. why does it create so many threads how can
>>> I
>>> >>> limit this?
>>> >
>>> >
>>>
>>
>>
>


Re: Map with state keys serialization

2016-10-12 Thread Shixiong(Ryan) Zhu
Oh, OpenHashMapBasedStateMap is serialized using Kryo's
"com.esotericsoftware.kryo.serializers.JavaSerializer". Did you set it for
OpenHashMapBasedStateMap? You don't need to set anything for Spark's
classes in 1.6.2.


On Wed, Oct 12, 2016 at 7:11 AM, Joey Echeverria <j...@rocana.com> wrote:

> I tried with 1.6.2 and saw the same behavior.
>
> -Joey
>
> On Tue, Oct 11, 2016 at 5:18 PM, Shixiong(Ryan) Zhu
> <shixi...@databricks.com> wrote:
> > There are some known issues in 1.6.0, e.g.,
> > https://issues.apache.org/jira/browse/SPARK-12591
> >
> > Could you try 1.6.1?
> >
> > On Tue, Oct 11, 2016 at 9:55 AM, Joey Echeverria <j...@rocana.com>
> wrote:
> >>
> >> I tried wrapping my Tuple class (which is generated by Avro) in a
> >> class that implements Serializable, but now I'm getting a
> >> ClassNotFoundException in my Spark application. The exception is
> >> thrown while trying to deserialize checkpoint state:
> >>
> >> https://gist.github.com/joey/7b374a2d483e25f15e20c0c4cb81b5cf
> >>
> >> I set some flags[1] on the JVM and I can see the class get loaded in the
> >> logs.
> >>
> >> Does anyone have any suggestions/recommendations for debugging class
> >> loading issues during checkpoint deserialization?
> >>
> >> I also looked into switching to byte[] for the state keys, but byte[]
> >> doesn't implement value-based equals() or hashCode(). I can't use
> >> ByteBuffer because it doesn't implement Serializable. Spark has a
> >> SerializableBuffer class that wraps ByteBuffer, but it also doesn't
> >> have value-based equals() or hashCode().
> >>
> >> -Joey
> >>
> >> [1] -verbose:class -Dsun.misc.URLClassPath.debug
> >>
> >> On Mon, Oct 10, 2016 at 11:28 AM, Joey Echeverria <j...@rocana.com>
> wrote:
> >> > I do, I get the stack trace in this gist:
> >> >
> >> > https://gist.github.com/joey/d3bf040af31e854b3be374e2c016d7e1
> >> >
> >> > The class it references, com.rocana.data.Tuple, is registered with
> >> > Kryo. Also, this is with 1.6.0 so if this behavior changed/got fixed
> >> > in a later release let me know.
> >> >
> >> > -Joey
> >> >
> >> > On Mon, Oct 10, 2016 at 9:54 AM, Shixiong(Ryan) Zhu
> >> > <shixi...@databricks.com> wrote:
> >> >> That's enough. Did you see any error?
> >> >>
> >> >> On Mon, Oct 10, 2016 at 5:08 AM, Joey Echeverria <j...@rocana.com>
> >> >> wrote:
> >> >>>
> >> >>> Hi Ryan!
> >> >>>
> >> >>> Do you know where I need to configure Kryo for this? I already have
> >> >>> spark.serializer=org.apache.spark.serializer.KryoSerializer in my
> >> >>> SparkConf and I registered the class. Is there a different
> >> >>> configuration setting for the state map keys?
> >> >>>
> >> >>> Thanks!
> >> >>>
> >> >>> -Joey
> >> >>>
> >> >>> On Sun, Oct 9, 2016 at 10:58 PM, Shixiong(Ryan) Zhu
> >> >>> <shixi...@databricks.com> wrote:
> >> >>> > You can use Kryo. It also implements KryoSerializable which is
> >> >>> > supported
> >> >>> > by
> >> >>> > Kryo.
> >> >>> >
> >> >>> > On Fri, Oct 7, 2016 at 11:39 AM, Joey Echeverria <j...@rocana.com
> >
> >> >>> > wrote:
> >> >>> >>
> >> >>> >> Looking at the source code for StateMap[1], which is used by
> >> >>> >> JavaPairDStream#mapWithState(), it looks like state keys are
> >> >>> >> serialized using an ObjectOutputStream. I couldn't find a
> reference
> >> >>> >> to
> >> >>> >> this restriction in the documentation. Did I miss that?
> >> >>> >>
> >> >>> >> Unless I'm mistaken, I'm guessing there isn't a way to use Kryo
> for
> >> >>> >> this serialization?
> >> >>> >>
> >> >>> >> Thanks!
> >> >>> >>
> >> >>> >> -Joey
> >> >>> >>
> >> >>> >> [1]
> >> >>> >>
> >> >>> >>
> >> >>> >> https://github.com/apache/spark/blob/master/streaming/
> src/main/scala/org/apache/spark/streaming/util/StateMap.scala#L251
> >> >>> >>
> >> >>> >>
> >> >>> >> 
> -
> >> >>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >>> >>
> >> >>> >
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> -Joey
> >> >>
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -Joey
> >>
> >>
> >>
> >> --
> >> -Joey
> >
> >
>
>
>
> --
> -Joey
>


Re: Map with state keys serialization

2016-10-11 Thread Shixiong(Ryan) Zhu
There are some known issues in 1.6.0, e.g.,
https://issues.apache.org/jira/browse/SPARK-12591

Could you try 1.6.1?

On Tue, Oct 11, 2016 at 9:55 AM, Joey Echeverria <j...@rocana.com> wrote:

> I tried wrapping my Tuple class (which is generated by Avro) in a
> class that implements Serializable, but now I'm getting a
> ClassNotFoundException in my Spark application. The exception is
> thrown while trying to deserialize checkpoint state:
>
> https://gist.github.com/joey/7b374a2d483e25f15e20c0c4cb81b5cf
>
> I set some flags[1] on the JVM and I can see the class get loaded in the
> logs.
>
> Does anyone have any suggestions/recommendations for debugging class
> loading issues during checkpoint deserialization?
>
> I also looked into switching to byte[] for the state keys, but byte[]
> doesn't implement value-based equals() or hashCode(). I can't use
> ByteBuffer because it doesn't implement Serializable. Spark has a
> SerializableBuffer class that wraps ByteBuffer, but it also doesn't
> have value-based equals() or hashCode().
>
> -Joey
>
> [1] -verbose:class -Dsun.misc.URLClassPath.debug
>
> On Mon, Oct 10, 2016 at 11:28 AM, Joey Echeverria <j...@rocana.com> wrote:
> > I do, I get the stack trace in this gist:
> >
> > https://gist.github.com/joey/d3bf040af31e854b3be374e2c016d7e1
> >
> > The class it references, com.rocana.data.Tuple, is registered with
> > Kryo. Also, this is with 1.6.0 so if this behavior changed/got fixed
> > in a later release let me know.
> >
> > -Joey
> >
> > On Mon, Oct 10, 2016 at 9:54 AM, Shixiong(Ryan) Zhu
> > <shixi...@databricks.com> wrote:
> >> That's enough. Did you see any error?
> >>
> >> On Mon, Oct 10, 2016 at 5:08 AM, Joey Echeverria <j...@rocana.com>
> wrote:
> >>>
> >>> Hi Ryan!
> >>>
> >>> Do you know where I need to configure Kryo for this? I already have
> >>> spark.serializer=org.apache.spark.serializer.KryoSerializer in my
> >>> SparkConf and I registered the class. Is there a different
> >>> configuration setting for the state map keys?
> >>>
> >>> Thanks!
> >>>
> >>> -Joey
> >>>
> >>> On Sun, Oct 9, 2016 at 10:58 PM, Shixiong(Ryan) Zhu
> >>> <shixi...@databricks.com> wrote:
> >>> > You can use Kryo. It also implements KryoSerializable which is
> supported
> >>> > by
> >>> > Kryo.
> >>> >
> >>> > On Fri, Oct 7, 2016 at 11:39 AM, Joey Echeverria <j...@rocana.com>
> >>> > wrote:
> >>> >>
> >>> >> Looking at the source code for StateMap[1], which is used by
> >>> >> JavaPairDStream#mapWithState(), it looks like state keys are
> >>> >> serialized using an ObjectOutputStream. I couldn't find a reference
> to
> >>> >> this restriction in the documentation. Did I miss that?
> >>> >>
> >>> >> Unless I'm mistaken, I'm guessing there isn't a way to use Kryo for
> >>> >> this serialization?
> >>> >>
> >>> >> Thanks!
> >>> >>
> >>> >> -Joey
> >>> >>
> >>> >> [1]
> >>> >>
> >>> >> https://github.com/apache/spark/blob/master/streaming/
> src/main/scala/org/apache/spark/streaming/util/StateMap.scala#L251
> >>> >>
> >>> >> 
> -
> >>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>> >>
> >>> >
> >>>
> >>>
> >>>
> >>> --
> >>> -Joey
> >>
> >>
> >
> >
> >
> > --
> > -Joey
>
>
>
> --
> -Joey
>


Re: Map with state keys serialization

2016-10-10 Thread Shixiong(Ryan) Zhu
That's enough. Did you see any error?

On Mon, Oct 10, 2016 at 5:08 AM, Joey Echeverria <j...@rocana.com> wrote:

> Hi Ryan!
>
> Do you know where I need to configure Kryo for this? I already have
> spark.serializer=org.apache.spark.serializer.KryoSerializer in my
> SparkConf and I registered the class. Is there a different
> configuration setting for the state map keys?
>
> Thanks!
>
> -Joey
>
> On Sun, Oct 9, 2016 at 10:58 PM, Shixiong(Ryan) Zhu
> <shixi...@databricks.com> wrote:
> > You can use Kryo. It also implements KryoSerializable which is supported
> by
> > Kryo.
> >
> > On Fri, Oct 7, 2016 at 11:39 AM, Joey Echeverria <j...@rocana.com>
> wrote:
> >>
> >> Looking at the source code for StateMap[1], which is used by
> >> JavaPairDStream#mapWithState(), it looks like state keys are
> >> serialized using an ObjectOutputStream. I couldn't find a reference to
> >> this restriction in the documentation. Did I miss that?
> >>
> >> Unless I'm mistaken, I'm guessing there isn't a way to use Kryo for
> >> this serialization?
> >>
> >> Thanks!
> >>
> >> -Joey
> >>
> >> [1]
> >> https://github.com/apache/spark/blob/master/streaming/
> src/main/scala/org/apache/spark/streaming/util/StateMap.scala#L251
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
>
>
>
> --
> -Joey
>


Re: Unresponsive Spark Streaming UI in YARN cluster mode - 1.5.2

2016-07-08 Thread Shixiong(Ryan) Zhu
Hey Tom,

Could you provide all blocked threads? Perhaps due to some potential
deadlock.

On Fri, Jul 8, 2016 at 10:30 AM, Ellis, Tom (Financial Markets IT) <
tom.el...@lloydsbanking.com.invalid> wrote:

> Hi There,
>
>
>
> We’re currently using HDP 2.3.4, Spark 1.5.2 with a Spark Streaming job in
> YARN Cluster mode consuming from a high volume Kafka topic. When we try to
> access the Spark Streaming UI on the application master, it is
> unresponsive/hangs or sometimes comes back with connection refused.
>
>
>
> It seems this UI is resident on the driver, and looking at its thread dump
> we see the below. Other tabs in the UI are fine. Does anyone have any
> ideas? Any further info required just ask.
>
>
>
> Thread 258: qtp1595613401-258 - /streaming/ (BLOCKED)
>
>
> org.apache.spark.streaming.DStreamGraph.getInputStreamName(DStreamGraph.scala:114)
>
>
> org.apache.spark.streaming.ui.StreamingJobProgressListener.streamName(StreamingJobProgressListener.scala:188)
>
>
> org.apache.spark.streaming.ui.StreamingPage$$anonfun$21.apply(StreamingPage.scala:429)
>
>
> org.apache.spark.streaming.ui.StreamingPage$$anonfun$21.apply(StreamingPage.scala:429)
>
> scala.Option.orElse(Option.scala:257)
>
> org.apache.spark.streaming.ui.StreamingPage.org
> $apache$spark$streaming$ui$StreamingPage$$generateInputDStreamRow(StreamingPage.scala:429)
>
>
> org.apache.spark.streaming.ui.StreamingPage$$anonfun$18.apply(StreamingPage.scala:396)
>
>
> org.apache.spark.streaming.ui.StreamingPage$$anonfun$18.apply(StreamingPage.scala:395)
>
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> scala.collection.AbstractTraversable.map(Traversable.scala:105)
>
>
> org.apache.spark.streaming.ui.StreamingPage.generateInputDStreamsTable(StreamingPage.scala:395)
>
>
> org.apache.spark.streaming.ui.StreamingPage.generateStatTable(StreamingPage.scala:348)
>
> org.apache.spark.streaming.ui.StreamingPage.render(StreamingPage.scala:157)
>
> org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
>
> org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
>
> org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:69)
>
> javax.servlet.http.HttpServlet.service(HttpServlet.java:735)
>
> javax.servlet.http.HttpServlet.service(HttpServlet.java:848)
>
>
> org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
>
>
> org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
>
>
> org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
>
>
> org.spark-project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
>
>
> org.spark-project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
>
>
> org.spark-project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
>
>
> org.spark-project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
>
>
> org.spark-project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
>
> org.spark-project.jetty.server.Server.handle(Server.java:366)
>
>
> org.spark-project.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
>
>
> org.spark-project.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
>
>
> org.spark-project.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
>
> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:644)
>
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>
>
> org.spark-project.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
>
>
> org.spark-project.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
>
>
> org.spark-project.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
>
>
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
>
>
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
>
> java.lang.Thread.run(Thread.java:745)
>
>
>
> Thread 33: SparkListenerBus (BLOCKED)
>
>
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onJobStart(StreamingJobProgressListener.scala:123)
>
>
> org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
>
>
> org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
>
>
> org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
>
> org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56)
>
>
> 

Re: Spark and Kafka direct approach problem

2016-05-04 Thread Shixiong(Ryan) Zhu
It's because the Scala version of Spark and the Scala version of Kafka
don't match. Please check them.

On Wed, May 4, 2016 at 6:17 AM, أنس الليثي  wrote:

> NoSuchMethodError usually appears because of a difference in the library
> versions.
>
> Check the version of the libraries you downloaded, the version of spark,
> the version of Kafka.
>
> On 4 May 2016 at 16:18, Luca Ferrari  wrote:
>
>> Hi,
>>
>> I’m new on Apache Spark and I’m trying to run the Spark Streaming +
>> Kafka Integration Direct Approach example (JavaDirectKafkaWordCount.java
>> ).
>>
>> I’ve downloaded all the libraries but when I try to run I get this error
>>
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
>>
>> at kafka.api.RequestKeys$.(RequestKeys.scala:48)
>>
>> at kafka.api.RequestKeys$.(RequestKeys.scala)
>>
>> at kafka.api.TopicMetadataRequest.(TopicMetadataRequest.scala:55)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>> at it.unimi.di.luca.SimpleApp.main(SimpleApp.java:53)
>>
>> Any suggestions?
>>
>> Cheers
>> Luca
>>
>>
>
>
>
> --
> Anas Rabei
> Senior Software Developer
> Mubasher.info
> anas.ra...@mubasher.info
>


Re: how to deploy new code with checkpointing

2016-04-11 Thread Shixiong(Ryan) Zhu
You cannot. Streaming doesn't support it because code changes will break
Java serialization.

On Mon, Apr 11, 2016 at 4:30 PM, Siva Gudavalli  wrote:

> hello,
>
> i am writing a spark streaming application to read data from kafka. I am
> using no receiver approach and enabled checkpointing to make sure I am not
> reading messages again in case of failure. (exactly once semantics)
>
> i have a quick question how checkpointing needs to be configured to handle
> code changes in my spark streaming app.
>
> can you please suggest. hope the question makes sense.
>
> thank you
>
> regards
> shiv
>


Re: Can not kill driver properly

2016-03-21 Thread Shixiong(Ryan) Zhu
Could you post the log of Master?

On Mon, Mar 21, 2016 at 9:25 AM, Hao Ren  wrote:

> Update:
>
> I am using --supervise flag for fault tolerance.
>
>
>
> On Mon, Mar 21, 2016 at 4:16 PM, Hao Ren  wrote:
>
>> Using spark 1.6.1
>> Spark Streaming Jobs are submitted via spark-submit (cluster mode)
>>
>> I tried to kill drivers via webUI, it does not work. These drivers are
>> still running.
>> I also tried:
>> 1. spark-submit --master  --kill 
>> 2. ./bin/spark-class org.apache.spark.deploy.Client kill 
>> 
>>
>> Neither works. The workaround is to ssh to the driver node, then kill -9
>> ...
>> jsp shows the same classname DriverWrapper, so need to pick carefully...
>>
>> Any idea why this happens ?
>> BTW, my streaming job's batch duration is one hour. So do we need to wait
>> for job processing to kill kill driver ?
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>


Re: Docker configuration for akka spark streaming

2016-03-14 Thread Shixiong(Ryan) Zhu
Could you use netstat to show the ports that the driver is listening?

On Mon, Mar 14, 2016 at 1:45 PM, David Gomez Saavedra 
wrote:

> hi everyone,
>
> I'm trying to set up spark streaming using akka with a similar example of
> the word count provided. When using spark master in local mode everything
> works but when I try to run it the driver and executors using docker I get
> the following exception
>
>
> 16/03/14 20:32:03 WARN NettyRpcEndpointRef: Error sending message [message = 
> Heartbeat(0,[Lscala.Tuple2;@5ad3f40c,BlockManagerId(0, 172.18.0.4, 7005))] in 
> 1 attempts
> org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 
> seconds. This timeout is controlled by spark.executor.heartbeatInterval
>   at 
> org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
>   at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
>   at scala.util.Try$.apply(Try.scala:192)
>   at scala.util.Failure.recover(Try.scala:216)
>   at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
>   at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
>   at 
> scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
>   at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>   at scala.concurrent.Promise$class.complete(Promise.scala:55)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
>   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
>   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>   at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at 
> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>   at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>   at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>   at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 
> 10 seconds
>   at 
> org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242)
>   ... 7 more
>
>
>
> Here is the config of the spark streaming app
>
> val conf = new SparkConf()
>   .setMaster(sparkMaster)
>   .setAppName(sparkApp)
>   .set("spark.cassandra.connection.host", CassandraConfig.host)
>   .set("spark.logConf", "true")
>   .set("spark.fileserver.port","7002")
>   .set("spark.broadcast.port","7003")
>   .set("spark.replClassServer.port","7004")
>   .set("spark.blockManager.port","7005")
>   .set("spark.executor.port","7006")
>   
> 

Re: Spark Streaming 1.6 mapWithState not working well with Kryo Serialization

2016-03-02 Thread Shixiong(Ryan) Zhu
See https://issues.apache.org/jira/browse/SPARK-12591

After applying the patch, it should work. However, if you want to enable
"registrationRequired", you still need to register
"org.apache.spark.streaming.util.OpenHashMapBasedStateMap",
"org.apache.spark.streaming.util.EmptyStateMap" and
"org.apache.spark.streaming.rdd.MapWithStateRDDRecord" by yourself because
these classes are defined in the Streaming project and we don't want to use
them in Spark core.


On Wed, Mar 2, 2016 at 1:41 PM, Aris  wrote:

> Hello Spark folks and especially TD,
>
> I am using the Spark Streaming 1.6 mapWithState API, and I am trying to
> enforce Kryo Serialization with
>
> SparkConf.set("spark.kryo.registrationRequired", "true")
>
> However, this appears to be impossible! I registered all the classes that
> are my own, but I problem with a
> class org.apache.spark.streaming.rdd.MapWithStateRDDRecord, which is set as
> private[streaming] .
>
>
> The error:
>
> java.lang.IllegalArgumentException: Class is not registered:
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord
> Note: To register this class use:
> kryo.register(org.apache.spark.streaming.rdd.MapWithStateRDDRecord.class);
> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
>
> Since this class is private with spark streaming itself, I cannot actually
> register it with Kryo, and I cannot do registrationRequired in order to
> make sure *everything* has been serialized with Kryo.
>
> Is this a bug? Can I somehow solve this?
>
> Aris
>


Re: spark streaming

2016-03-02 Thread Shixiong(Ryan) Zhu
Hey,

KafkaUtils.createDirectStream doesn't need a StorageLevel as it doesn't
store blocks to BlockManager. However, the error is not related
to StorageLevel. It may be a bug. Could you provide more info about it?
E.g., Spark version, your codes, logs.

On Wed, Mar 2, 2016 at 3:02 AM, Vinti Maheshwari 
wrote:

> Hi All,
>
> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
> program as currently i am getting
> MetadataFetchFailedException*. *I am not sure where i should pass
> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream doesn't
> allow to pass that parameter.
>
>
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](
>   ssc, kafkaParams, topicsSet)
>
>
> Full Error:
>
> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 0*
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
> at
> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
> at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> )
>
> Thanks,
> ~Vinti
>


Re: Spark executor killed without apparent reason

2016-03-01 Thread Shixiong(Ryan) Zhu
Could you search "OutOfMemoryError" in the executor logs? It could be
"OufOfMemoryError: Direct Buffer Memory" or something else.

On Tue, Mar 1, 2016 at 6:23 AM, Nirav Patel  wrote:

> Hi,
>
> We are using spark 1.5.2 or yarn. We have a spark application utilizing
> about 15GB executor memory and 1500 overhead. However, at certain stage we
> notice higher GC time (almost same as task time) spent. These executors are
> bound to get killed at some point. However, nodemanager or resource manager
> logs doesn't indicate failure due to 'beyond physical/virtual memory
> limits' nor I see any 'heap space' or 'gc overhead exceeded' errors in
> executor logs. Some of these high GC executor gets killed eventually but I
> can't seem to find reason. Based on application logs it seems like executor
> didn't respond to driver for long period of time and connection was reset.
>
> Following are logs from 'yarn logs -applicationId appId_1232_xxx'
>
>
> 16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage 8.0
> (TID 15318). 2099 bytes result sent to driver
> 16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned
> task 15333
> 16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage 8.0
> (TID 15333)
> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125
> non-empty blocks out of 3007 blocks
> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14
> remote fetches in 10 ms
> 16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to
> maprnode5 has been quiet for 12 ms while there are outstanding
> requests. Assuming connection is dead; please adjust spark.network.timeout
> if this is wrong.
> 16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1
> requests outstanding when connection from maprnode5 is closed
> 16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while
> starting block fetches
> java.io.IOException: Connection from maprnode5 closed
> at
> org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
> at
> io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:744)
> 16/02/24 11:11:47 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3)
> for 6 outstanding blocks after 5000 ms
> 16/02/24 11:11:52 INFO client.TransportClientFactory: Found inactive
> connection to maprnode5, creating a new one.
> 16/02/24 11:12:16 WARN server.TransportChannelHandler: Exception in
> connection from maprnode5
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at
> 

Re: Converting array to DF

2016-03-01 Thread Shixiong(Ryan) Zhu
For Array, you need to all `toSeq` at first. Scala can convert Array to
ArrayOps automatically. However, it's not a `Seq` and you need to call
`toSeq` explicitly.

On Tue, Mar 1, 2016 at 1:02 AM, Ashok Kumar 
wrote:

> Thank you sir
>
> This works OK
> import sqlContext.implicits._
> val weights = Seq(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f",
> 4), ("g", 6))
>
> weights.toDF("weights","value").orderBy(desc("value")).collect.foreach(println)
>
> Please why Array did not work?
>
>
> On Tuesday, 1 March 2016, 8:51, Jeff Zhang  wrote:
>
>
> Change Array to Seq and import sqlContext.implicits._
>
>
>
> On Tue, Mar 1, 2016 at 4:38 PM, Ashok Kumar 
> wrote:
>
> Hi,
>
> I have this
>
> val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9),
> ("f", 4), ("g", 6))
> weights.toDF("weights","value")
>
> I want to convert the Array to DF but I get thisor
>
> weights: Array[(String, Int)] = Array((a,3), (b,2), (c,5), (d,1), (e,9),
> (f,4), (g,6))
> :33: error: value toDF is not a member of Array[(String, Int)]
>   weights.toDF("weights","value")
>
> I want to label columns and print out the contents in value order please I
> don't know why I am getting this error
>
> Thanks
>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>


Re: Using a non serializable third party JSON serializable on a spark worker node throws NotSerializableException

2016-03-01 Thread Shixiong(Ryan) Zhu
Could you show the full companion object? It looks weird that having
`override` in a companion object of a case class.

On Tue, Mar 1, 2016 at 11:16 AM, Yuval Itzchakov <yuva...@gmail.com> wrote:

> As I said, it is the method which eventually serializes the object. It is
> declared inside a companion object of a case class.
>
> The problem is that Spark will still try to serialize the method, as it
> needs to execute on the worker. How will that change the fact that
> `EncodeJson[T]` is not serializable?
>
>
> On Tue, Mar 1, 2016, 21:12 Shixiong(Ryan) Zhu <shixi...@databricks.com>
> wrote:
>
>> Don't know where "argonaut.EncodeJson$$anon$2" comes from. However, you
>> can always put your codes into an method of an "object". Then just call it
>> like a Java static method.
>>
>> On Tue, Mar 1, 2016 at 10:30 AM, Yuval.Itzchakov <yuva...@gmail.com>
>> wrote:
>>
>>> I have a small snippet of code which relays on  argonaut
>>> <http://argonaut.io/>   for JSON serialization which is ran from a
>>> `PairRDDFunctions.mapWithState` once a session is completed.
>>>
>>> This is the code snippet (not that important):
>>>
>>>   override def sendMessage(pageView: PageView): Unit = {
>>> Future {
>>>   LogHolder.logger.info(s"Sending pageview: ${pageView.id} to
>>> automation")
>>>   try {
>>> Http(url)
>>>   .postData(pageView.asJson.toString)
>>>   .option(HttpOptions.connTimeout(timeOutMilliseconds))
>>>   .asString
>>>   .throwError
>>>   }
>>>   catch {
>>> case NonFatal(e) => LogHolder.logger.error("Failed to send
>>> pageview", e)
>>>   }
>>> }
>>>   }
>>>
>>> argonaut relys on a user implementation of a trait called
>>> `EncodeJson[T]`,
>>> which tells argonaut how to serialize and deserialize the object.
>>>
>>> The problem is, that the trait `EncodeJson[T]` is not serializable, thus
>>> throwing a NotSerializableException:
>>>
>>> Caused by: java.io.NotSerializableException: argonaut.EncodeJson$$anon$2
>>> Serialization stack:
>>> - object not serializable (class: argonaut.EncodeJson$$anon$2,
>>> value: argonaut.EncodeJson$$anon$2@6415f61e)
>>>
>>> This is obvious and understandable.
>>>
>>> The question I have is - What possible ways are there to work around
>>> this?
>>> I'm currently depended on a third-party library which I can't control of
>>> change to implement Serializable in anyway. I've seen this  this
>>> StackOverflow answer
>>> <
>>> http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou
>>> >
>>> but couldn't implement any reasonable workaround.
>>>
>>> Anyone have any ideas?
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-non-serializable-third-party-JSON-serializable-on-a-spark-worker-node-throws-NotSerializablen-tp26372.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>


Re: Using a non serializable third party JSON serializable on a spark worker node throws NotSerializableException

2016-03-01 Thread Shixiong(Ryan) Zhu
Don't know where "argonaut.EncodeJson$$anon$2" comes from. However, you can
always put your codes into an method of an "object". Then just call it like
a Java static method.

On Tue, Mar 1, 2016 at 10:30 AM, Yuval.Itzchakov  wrote:

> I have a small snippet of code which relays on  argonaut
>    for JSON serialization which is ran from a
> `PairRDDFunctions.mapWithState` once a session is completed.
>
> This is the code snippet (not that important):
>
>   override def sendMessage(pageView: PageView): Unit = {
> Future {
>   LogHolder.logger.info(s"Sending pageview: ${pageView.id} to
> automation")
>   try {
> Http(url)
>   .postData(pageView.asJson.toString)
>   .option(HttpOptions.connTimeout(timeOutMilliseconds))
>   .asString
>   .throwError
>   }
>   catch {
> case NonFatal(e) => LogHolder.logger.error("Failed to send
> pageview", e)
>   }
> }
>   }
>
> argonaut relys on a user implementation of a trait called `EncodeJson[T]`,
> which tells argonaut how to serialize and deserialize the object.
>
> The problem is, that the trait `EncodeJson[T]` is not serializable, thus
> throwing a NotSerializableException:
>
> Caused by: java.io.NotSerializableException: argonaut.EncodeJson$$anon$2
> Serialization stack:
> - object not serializable (class: argonaut.EncodeJson$$anon$2,
> value: argonaut.EncodeJson$$anon$2@6415f61e)
>
> This is obvious and understandable.
>
> The question I have is - What possible ways are there to work around this?
> I'm currently depended on a third-party library which I can't control of
> change to implement Serializable in anyway. I've seen this  this
> StackOverflow answer
> <
> http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou
> >
> but couldn't implement any reasonable workaround.
>
> Anyone have any ideas?
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-non-serializable-third-party-JSON-serializable-on-a-spark-worker-node-throws-NotSerializablen-tp26372.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-29 Thread Shixiong(Ryan) Zhu
Could you post the screenshot of the Streaming DAG and also the driver log?
It would be great if you have a simple producer for us to debug.

On Mon, Feb 29, 2016 at 1:39 AM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> Hi Ryan,
>
> Its not working even after removing the reduceByKey.
>
> So, basically I am doing the following
> - reading from kafka
> - flatmap inside transform
> - mapWithState
> - rdd.count on output of mapWithState
>
> But to my surprise still dont see checkpointing taking place.
>
> Is there any restriction to the type of operation that we can perform
> inside mapWithState ?
>
> Really need to resolve this one as currently if my application is
> restarted from checkpoint it has to repartition 120 previous stages which
> takes hell lot of time.
>
> Thanks !!
> Abhi
>
> On Mon, Feb 29, 2016 at 3:42 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Sorry that I forgot to tell you that you should also call `rdd.count()`
>> for "reduceByKey" as well. Could you try it and see if it works?
>>
>> On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand <abhis.anan...@gmail.com>
>> wrote:
>>
>>> Hi Ryan,
>>>
>>> I am using mapWithState after doing reduceByKey.
>>>
>>> I am right now using mapWithState as you suggested and triggering the
>>> count manually.
>>>
>>> But, still unable to see any checkpointing taking place. In the DAG I
>>> can see that the reduceByKey operation for the previous batches are also
>>> being computed.
>>>
>>>
>>> Thanks
>>> Abhi
>>>
>>>
>>> On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
>>>> Hey Abhi,
>>>>
>>>> Using reducebykeyandwindow and mapWithState will trigger the bug
>>>> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>>>>
>>>> JavaMapWithStateDStream<...> stateDStream =
>>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
>>>> stateDStream.foreachRDD(new Function1<...>() {
>>>>   @Override
>>>>   public Void call(JavaRDD<...> rdd) throws Exception {
>>>> rdd.count();
>>>>   }
>>>> });
>>>> return stateDStream.stateSnapshots();
>>>>
>>>>
>>>> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <
>>>> abhis.anan...@gmail.com> wrote:
>>>>
>>>>> Hi Ryan,
>>>>>
>>>>> Reposting the code.
>>>>>
>>>>> Basically my use case is something like - I am receiving the web
>>>>> impression logs and may get the notify (listening from kafka) for those
>>>>> impressions in the same interval (for me its 1 min) or any next interval
>>>>> (upto 2 hours). Now, when I receive notify for a particular impression I
>>>>> need to swap the date field in impression with the date field in notify
>>>>> logs. The notify for an impression has the same key as impression.
>>>>>
>>>>> static Function3<String, Optional, State,
>>>>> Tuple2<String, MyClass>> mappingFunc =
>>>>> new Function3<String, Optional, State,
>>>>> Tuple2<String, MyClass>>() {
>>>>> @Override
>>>>> public Tuple2<String, MyClass> call(String key, Optional one,
>>>>> State state) {
>>>>> MyClass nullObj = new MyClass();
>>>>> nullObj.setImprLog(null);
>>>>> nullObj.setNotifyLog(null);
>>>>> MyClass current = one.or(nullObj);
>>>>>
>>>>> if(current!= null && current.getImprLog() != null &&
>>>>> current.getMyClassType() == 1 /*this is impression*/){
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>> else if (current.getNotifyLog() != null  && current.getMyClassType()
>>>>> == 3 /*notify for the impression received*/){
>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>>  //swappping the dates
>>>>> return new Tuple2<>(key, oldState);
>>>>> }
>>>>> else{
>>>>> return

Re: Spark UI standalone "crashes" after an application finishes

2016-02-29 Thread Shixiong(Ryan) Zhu
Do you mean you cannot access Master UI after your application completes?
Could you check the master log?

On Mon, Feb 29, 2016 at 3:48 PM, Sumona Routh  wrote:

> Hi there,
> I've been doing some performance tuning of our Spark application, which is
> using Spark 1.2.1 standalone. I have been using the spark metrics to graph
> out details as I run the jobs, as well as the UI to review the tasks and
> stages.
>
> I notice that after my application completes, or is near completion, the
> UI "crashes." I get a Connection Refused response. Sometimes, the page
> eventually recovers and will load again, but sometimes I end up having to
> restart the Spark master to get it back. When I look at my graphs on the
> app, the memory consumption (of driver, executors, and what I believe to be
> the daemon (spark.jvm.total.used)) appears to be healthy. Monitoring the
> master machine itself, memory and CPU appear healthy as well.
>
> Has anyone else seen this issue? Are there logs for the UI itself, and
> where might I find those?
>
> Thanks!
> Sumona
>


Re: java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-28 Thread Shixiong(Ryan) Zhu
This is because the Snappy library cannot load the native library. Did you
forget to install the snappy native library in your new machines?

On Fri, Feb 26, 2016 at 11:05 PM, Abhishek Anand 
wrote:

> Any insights on this ?
>
> On Fri, Feb 26, 2016 at 1:21 PM, Abhishek Anand 
> wrote:
>
>> On changing the default compression codec which is snappy to lzf the
>> errors are gone !!
>>
>> How can I fix this using snappy as the codec ?
>>
>> Is there any downside of using lzf as snappy is the default codec that
>> ships with spark.
>>
>>
>> Thanks !!!
>> Abhi
>>
>> On Mon, Feb 22, 2016 at 7:42 PM, Abhishek Anand 
>> wrote:
>>
>>> Hi ,
>>>
>>> I am getting the following exception on running my spark streaming job.
>>>
>>> The same job has been running fine since long and when I added two new
>>> machines to my cluster I see the job failing with the following exception.
>>>
>>>
>>>
>>> 16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
>>> (TID 22594)
>>> java.io.IOException: java.lang.reflect.InvocationTargetException
>>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:744)
>>> Caused by: java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>> at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
>>> at
>>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
>>> at
>>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
>>> at org.apache.spark.broadcast.TorrentBroadcast.org
>>> $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
>>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
>>> ... 11 more
>>> Caused by: java.lang.IllegalArgumentException
>>> at
>>> org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
>>> ... 20 more
>>>
>>>
>>>
>>> Thanks !!!
>>> Abhi
>>>
>>
>>
>


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-28 Thread Shixiong(Ryan) Zhu
Sorry that I forgot to tell you that you should also call `rdd.count()` for
"reduceByKey" as well. Could you try it and see if it works?

On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> Hi Ryan,
>
> I am using mapWithState after doing reduceByKey.
>
> I am right now using mapWithState as you suggested and triggering the
> count manually.
>
> But, still unable to see any checkpointing taking place. In the DAG I can
> see that the reduceByKey operation for the previous batches are also being
> computed.
>
>
> Thanks
> Abhi
>
>
> On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Hey Abhi,
>>
>> Using reducebykeyandwindow and mapWithState will trigger the bug
>> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>>
>> JavaMapWithStateDStream<...> stateDStream =
>> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
>> stateDStream.foreachRDD(new Function1<...>() {
>>   @Override
>>   public Void call(JavaRDD<...> rdd) throws Exception {
>> rdd.count();
>>   }
>> });
>> return stateDStream.stateSnapshots();
>>
>>
>> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <abhis.anan...@gmail.com
>> > wrote:
>>
>>> Hi Ryan,
>>>
>>> Reposting the code.
>>>
>>> Basically my use case is something like - I am receiving the web
>>> impression logs and may get the notify (listening from kafka) for those
>>> impressions in the same interval (for me its 1 min) or any next interval
>>> (upto 2 hours). Now, when I receive notify for a particular impression I
>>> need to swap the date field in impression with the date field in notify
>>> logs. The notify for an impression has the same key as impression.
>>>
>>> static Function3<String, Optional, State,
>>> Tuple2<String, MyClass>> mappingFunc =
>>> new Function3<String, Optional, State, Tuple2<String,
>>> MyClass>>() {
>>> @Override
>>> public Tuple2<String, MyClass> call(String key, Optional one,
>>> State state) {
>>> MyClass nullObj = new MyClass();
>>> nullObj.setImprLog(null);
>>> nullObj.setNotifyLog(null);
>>> MyClass current = one.or(nullObj);
>>>
>>> if(current!= null && current.getImprLog() != null &&
>>> current.getMyClassType() == 1 /*this is impression*/){
>>> return new Tuple2<>(key, null);
>>> }
>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>> 3 /*notify for the impression received*/){
>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>  //swappping the dates
>>> return new Tuple2<>(key, oldState);
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>>
>>> }
>>> };
>>>
>>>
>>> return
>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>>
>>>
>>> Currently I am using reducebykeyandwindow without the inverse function
>>> and I am able to get the correct data. But, issue the might arise is when I
>>> have to restart my application from checkpoint and it repartitions and
>>> computes the previous 120 partitions, which delays the incoming batches.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
>>> shixi...@databricks.com> wrote:
>>>
>>>> Hey Abhi,
>>>>
>>>> Could you post how you use mapWithState? By default, it should do
>>>> checkpointing every 10 batches.
>>>> However, there is a known issue that prevents mapWithState from
>>>> checkpointing in some special cases:
>>>> https://issues.apache.org/jira/browse/SPARK-6847
>>>>
>>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <
>>>> abhis.anan...@gmail.com> wrote:
>>>>
>>>>> Any Insights on this one ?
>>>>>
>>>>>
>>>>> Thanks !!!
>>>>> Abhi
>>>>>
>>>>> On Mon

Re: Access fields by name/index from Avro data read from Kafka through Spark Streaming

2016-02-25 Thread Shixiong(Ryan) Zhu
You can use `DStream.map` to transform objects to anything you want.

On Thu, Feb 25, 2016 at 11:06 AM, Mohammad Tariq  wrote:

> Hi group,
>
> I have just started working with confluent platform and spark streaming,
> and was wondering if it is possible to access individual fields from an
> Avro object read from a kafka topic through spark streaming. As per its
> default behaviour *KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return
> a *DStream[Object, Object]*, and don't have any schema associated with
> *Object*(or I am unable to figure it out). This makes it impossible to
> perform some operations on this DStream, for example, converting it to a
> Spark DataFrame.
>
> Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
> *Object *I think I am going in the wrong direction. Any
> pointers/suggestions would be really helpful.
>
> *Versions used :*
> confluent-1.0.1
> spark-1.6.0-bin-hadoop2.4
> Scala code runner version - 2.11.6
>
> And this is the small piece of code I am using :
>
> package org.myorg.scalaexamples
>
> import org.apache.spark.rdd.RDD
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.SparkContext
> import org.apache.avro.mapred.AvroKey
> import org.apache.spark.sql.SQLContext
> //import org.apache.avro.mapred.AvroValue
> import org.apache.spark.streaming.kafka._
> import org.apache.spark.storage.StorageLevel
> import org.apache.avro.generic.GenericRecord
> import org.apache.spark.streaming.dstream.DStream
> import io.confluent.kafka.serializers.KafkaAvroDecoder
> //import org.apache.hadoop.io.serializer.avro.AvroRecord
> //import org.apache.spark.streaming.dstream.ForEachDStream
> import org.apache.spark.sql.SQLContext
> import org.apache.kafka.common.serialization.Deserializer
>
> object DirectKafkaWordCount {
>   def main(args: Array[String]) {
> if (args.length < 2) {
>   System.err.println(s"""
> |Usage: DirectKafkaWordCount  
> |   is a list of one or more Kafka brokers
> |   is a list of one or more kafka topics to consume from
> |
> """.stripMargin)
>   System.exit(1)
> }
> val Array(brokers, topics) = args
> val sparkConf = new
> SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")
>
> sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]]))
> val ssc = new StreamingContext(sparkConf, Seconds(5))
> val topicsSet = topics.split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers, "group.id" -> "consumer",
>   "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> "
> http://localhost:8081;)
> val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
> messages.print()
> ssc.start()
> ssc.awaitTermination()
>   }
> }
>
> Thank you so much for your valuable time!
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>


Re: PLease help: installation of spark 1.6.0 on ubuntu fails

2016-02-25 Thread Shixiong(Ryan) Zhu
Please use Java 7 instead.

On Thu, Feb 25, 2016 at 1:54 PM, Marco Mistroni  wrote:

> Hello all
>  could anyone help?
> i have tried to install spark 1.6.0 on ubuntu, but the installation failed
> Here are my steps
>
> 1. download spark (successful)
>
> 31  wget http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0.tgz
>
> 33  tar -zxf spark-1.6.0.tgz
>
>
>
> 2. cd spark-1.6.0
>
> 2.1 sbt assembly
>
>
>
> error] /home/vagrant/spark-1.6.0/project/SparkBuild.scala:19: object file
> is not a member of package java.nio
>
> error] import java.nio.file.Files
>
> error] ^
>
> error] /home/vagrant/spark-1.6.0/project/SparkBuild.scala:465: not found:
> value Files
>
> error]   Files.copy(jar.toPath, dest.toPath)
>
> error]   ^
> error] two errors foun
>
> could anyone assist?
> kind regards
>


Re: Spark Streaming not reading input coming from the other ip

2016-02-22 Thread Shixiong(Ryan) Zhu
What's the error info reported by Streaming? And could you use "telnet" to
test if the network is normal?

On Mon, Feb 22, 2016 at 6:59 AM, Vinti Maheshwari 
wrote:

> For reference, my program:
>
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("HBaseStream")
> val sc = new SparkContext(conf)
> // create a StreamingContext, the main entry point for all streaming 
> functionality
> val ssc = new StreamingContext(sc, Seconds(2))
> val inputStream = ssc.socketTextStream(, )
> ...
> }
>
> Data coming form different host to this socket, But somehow spark is not
> reading it, while if i copy and paste same data, it works.
>
>  [root@ ~]# nc -lk 
> 56b4b2b23c24c3608376d1f0,/obj/i386/junos/lib/librtsock/rtslib_gencfg.So.gcda,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
> 56b4b2b23c24c3608376d1f0,/obj/i386/junos/lib/librtsock/rtslib_idl.So.gcda,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
>
> On Mon, Feb 22, 2016 at 6:38 AM, Vinti Maheshwari 
> wrote:
>
>> Hi
>>
>> I am in spark Streaming context, and i am reading input from the the
>> socket using nc -lk . When i am running it and manually giving input
>> it's working. But, if input is coming from different ip to this socket then
>> spark is not reading that input, though it's showing all the input coming
>> from different source under nc -lk . But somehow spark is not reading
>> it.
>>
>> I am not sure what can be issue. Does anyone has idea about it? Thanks in
>> advance.
>>
>>
>> Thanks
>>
>> ~Vinti
>>
>
>


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-22 Thread Shixiong(Ryan) Zhu
Hey Abhi,

Using reducebykeyandwindow and mapWithState will trigger the bug
in SPARK-6847. Here is a workaround to trigger checkpoint manually:

JavaMapWithStateDStream<...> stateDStream =
myPairDstream.mapWithState(StateSpec.function(mappingFunc));
stateDStream.foreachRDD(new Function1<...>() {
  @Override
  public Void call(JavaRDD<...> rdd) throws Exception {
rdd.count();
  }
});
return stateDStream.stateSnapshots();


On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> Hi Ryan,
>
> Reposting the code.
>
> Basically my use case is something like - I am receiving the web
> impression logs and may get the notify (listening from kafka) for those
> impressions in the same interval (for me its 1 min) or any next interval
> (upto 2 hours). Now, when I receive notify for a particular impression I
> need to swap the date field in impression with the date field in notify
> logs. The notify for an impression has the same key as impression.
>
> static Function3<String, Optional, State, Tuple2<String,
> MyClass>> mappingFunc =
> new Function3<String, Optional, State, Tuple2<String,
> MyClass>>() {
> @Override
> public Tuple2<String, MyClass> call(String key, Optional one,
> State state) {
> MyClass nullObj = new MyClass();
> nullObj.setImprLog(null);
> nullObj.setNotifyLog(null);
> MyClass current = one.or(nullObj);
>
> if(current!= null && current.getImprLog() != null &&
> current.getMyClassType() == 1 /*this is impression*/){
> return new Tuple2<>(key, null);
> }
> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
> /*notify for the impression received*/){
> MyClass oldState = (state.exists() ? state.get() : nullObj);
> if(oldState!= null && oldState.getNotifyLog() != null){
> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>  //swappping the dates
> return new Tuple2<>(key, oldState);
> }
> else{
> return new Tuple2<>(key, null);
> }
> }
> else{
> return new Tuple2<>(key, null);
> }
>
> }
> };
>
>
> return
> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>
>
> Currently I am using reducebykeyandwindow without the inverse function and
> I am able to get the correct data. But, issue the might arise is when I
> have to restart my application from checkpoint and it repartitions and
> computes the previous 120 partitions, which delays the incoming batches.
>
>
> Thanks !!
> Abhi
>
> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Hey Abhi,
>>
>> Could you post how you use mapWithState? By default, it should do
>> checkpointing every 10 batches.
>> However, there is a known issue that prevents mapWithState from
>> checkpointing in some special cases:
>> https://issues.apache.org/jira/browse/SPARK-6847
>>
>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com>
>> wrote:
>>
>>> Any Insights on this one ?
>>>
>>>
>>> Thanks !!!
>>> Abhi
>>>
>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
>>>> I am now trying to use mapWithState in the following way using some
>>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>>> the state and when restarting the application from checkpoint, it
>>>> re-partitions all the previous batches data from kafka.
>>>>
>>>> static Function3<String, Optional, State,
>>>> Tuple2<String, MyClass>> mappingFunc =
>>>> new Function3<String, Optional, State, Tuple2<String,
>>>> MyClass>>() {
>>>> @Override
>>>> public Tuple2<String, MyClass> call(String key, Optional one,
>>>> State state) {
>>>> MyClass nullObj = new MyClass();
>>>> nullObj.setImprLog(null);
>>>> nullObj.setNotifyLog(null);
>>>> MyClass current = one.or(nullObj);
>>>>
>>>> if(current!= null && current.getImprLog() != null &&
>>>> current.getMyClassType() == 1){
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>>> 3){
>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>> oldState.g

Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-22 Thread Shixiong(Ryan) Zhu
Hey, Ted,

As the fix for SPARK-6847 changes the semantics of Streaming checkpointing,
it doesn't go into branch 1.6.

A workaround is calling `count` to trigger the checkpoint manually. Such as,

val dstream = ... // dstream is an operator needing to be checkpointed.
dstream.foreachRDD(rdd => rdd.count())


On Mon, Feb 22, 2016 at 12:25 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Fix for SPARK-6847 is not in branch-1.6
>
> Should the fix be ported to branch-1.6 ?
>
> Thanks
>
> On Feb 22, 2016, at 11:55 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com>
> wrote:
>
> Hey Abhi,
>
> Could you post how you use mapWithState? By default, it should do
> checkpointing every 10 batches.
> However, there is a known issue that prevents mapWithState from
> checkpointing in some special cases:
> https://issues.apache.org/jira/browse/SPARK-6847
>
> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> Any Insights on this one ?
>>
>>
>> Thanks !!!
>> Abhi
>>
>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <abhis.anan...@gmail.com
>> > wrote:
>>
>>> I am now trying to use mapWithState in the following way using some
>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>> the state and when restarting the application from checkpoint, it
>>> re-partitions all the previous batches data from kafka.
>>>
>>> static Function3<String, Optional, State,
>>> Tuple2<String, MyClass>> mappingFunc =
>>> new Function3<String, Optional, State, Tuple2<String,
>>> MyClass>>() {
>>> @Override
>>> public Tuple2<String, MyClass> call(String key, Optional one,
>>> State state) {
>>> MyClass nullObj = new MyClass();
>>> nullObj.setImprLog(null);
>>> nullObj.setNotifyLog(null);
>>> MyClass current = one.or(nullObj);
>>>
>>> if(current!= null && current.getImprLog() != null &&
>>> current.getMyClassType() == 1){
>>> return new Tuple2<>(key, null);
>>> }
>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>> 3){
>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>> return new Tuple2<>(key, oldState);
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>>
>>> }
>>> };
>>>
>>>
>>> Please suggest if this is the proper way or am I doing something wrong.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian@gmail.com>
>>> wrote:
>>>
>>>> If you don't want to update your only option will be updateStateByKey
>>>> then
>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>>
>>>>> mapWithState supports checkpoint.
>>>>>
>>>>> There has been some bug fix since release of 1.6.0
>>>>> e.g.
>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>> with KryoSerializer
>>>>>
>>>>> which is in the upcoming 1.6.1
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>>> abhis.anan...@gmail.com> wrote:
>>>>>
>>>>>> Does mapWithState checkpoints the data ?
>>>>>>
>>>>>> When my application goes down and is restarted from checkpoint, will
>>>>>> mapWithState need to recompute the previous batches data ?
>>>>>>
>>>>>> Also, to use mapWithState I will need to upgrade my application as I
>>>>>> am using version 1.4.0 and mapWithState isnt supported there. Is there 
>>>>>> any
>>>>>> other work around ?
>>>>>>
>>>>>> Cheers!!
>>>>>> Abhi
>>>>>>
>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>>> sebastian@gmail.com> wrote:
>>>>>>
>>>>>>> Looks like mapWithState could help you?
>>>>>>>

Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-22 Thread Shixiong(Ryan) Zhu
Hey Abhi,

Could you post how you use mapWithState? By default, it should do
checkpointing every 10 batches.
However, there is a known issue that prevents mapWithState from
checkpointing in some special cases:
https://issues.apache.org/jira/browse/SPARK-6847

On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand 
wrote:

> Any Insights on this one ?
>
>
> Thanks !!!
> Abhi
>
> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand 
> wrote:
>
>> I am now trying to use mapWithState in the following way using some
>> example codes. But, by looking at the DAG it does not seem to checkpoint
>> the state and when restarting the application from checkpoint, it
>> re-partitions all the previous batches data from kafka.
>>
>> static Function3> Tuple2> mappingFunc =
>> new Function3> MyClass>>() {
>> @Override
>> public Tuple2 call(String key, Optional one,
>> State state) {
>> MyClass nullObj = new MyClass();
>> nullObj.setImprLog(null);
>> nullObj.setNotifyLog(null);
>> MyClass current = one.or(nullObj);
>>
>> if(current!= null && current.getImprLog() != null &&
>> current.getMyClassType() == 1){
>> return new Tuple2<>(key, null);
>> }
>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>> 3){
>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>> if(oldState!= null && oldState.getNotifyLog() != null){
>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>> return new Tuple2<>(key, oldState);
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>>
>> }
>> };
>>
>>
>> Please suggest if this is the proper way or am I doing something wrong.
>>
>>
>> Thanks !!
>> Abhi
>>
>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu 
>> wrote:
>>
>>> If you don't want to update your only option will be updateStateByKey
>>> then
>>> On 13 Feb 2016 8:48 p.m., "Ted Yu"  wrote:
>>>
 mapWithState supports checkpoint.

 There has been some bug fix since release of 1.6.0
 e.g.
   SPARK-12591 NullPointerException using checkpointed mapWithState with
 KryoSerializer

 which is in the upcoming 1.6.1

 Cheers

 On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
 abhis.anan...@gmail.com> wrote:

> Does mapWithState checkpoints the data ?
>
> When my application goes down and is restarted from checkpoint, will
> mapWithState need to recompute the previous batches data ?
>
> Also, to use mapWithState I will need to upgrade my application as I
> am using version 1.4.0 and mapWithState isnt supported there. Is there any
> other work around ?
>
> Cheers!!
> Abhi
>
> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
> sebastian@gmail.com> wrote:
>
>> Looks like mapWithState could help you?
>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" 
>> wrote:
>>
>>> Hi All,
>>>
>>> I have an use case like follows in my production environment where I
>>> am listening from kafka with slideInterval of 1 min and windowLength of 
>>> 2
>>> hours.
>>>
>>> I have a JavaPairDStream where for each key I am getting the same
>>> key but with different value,which might appear in the same batch or 
>>> some
>>> next batch.
>>>
>>> When the key appears second time I need to update a field in value
>>> of previous key with a field in the later key. The keys for which the
>>> combination keys do not come should be rejected after 2 hours.
>>>
>>> At the end of each second I need to output the result to external
>>> database.
>>>
>>> For example :
>>>
>>> Suppose valueX is object of MyClass with fields int a, String b
>>> At t=1sec I am getting
>>> key0,value0(0,"prev0")
>>> key1,value1 (1, "prev1")
>>> key2,value2 (2,"prev2")
>>> key2,value3 (3, "next2")
>>>
>>> Output to database after 1 sec
>>> key2, newValue (2,"next2")
>>>
>>> At t=2 sec getting
>>> key3,value4(4,"prev3")
>>> key1,value5(5,"next1")
>>>
>>> Output to database after 2 sec
>>> key1,newValue(1,"next1")
>>>
>>> At t=3 sec
>>> key4,value6(6,"prev4")
>>> key3,value7(7,"next3")
>>> key5,value5(8,"prev5")
>>> key5,value5(9,"next5")
>>> key0,value0(10,"next0")
>>>
>>> Output to database after 3 sec
>>> key0,newValue(0,"next0")
>>> key3,newValue(4,"next3")
>>> key5,newValue(8,"next5")
>>>
>>>
>>> Please suggest how this can be achieved.
>>>
>>>
>>> Thanks a lot 
>>> Abhi
>>>
>>>
>>>
>

>>
>


Re: Access to broadcasted variable

2016-02-19 Thread Shixiong(Ryan) Zhu
The broadcasted object is serialized in driver and sent to the executors.
And in the executor, it will deserialize the bytes to get the broadcasted
object.

On Fri, Feb 19, 2016 at 5:54 AM, jeff saremi  wrote:

> could someone please comment on this? thanks
>
> --
> From: jeffsar...@hotmail.com
> To: user@spark.apache.org
> Subject: Access to broadcasted variable
> Date: Thu, 18 Feb 2016 14:44:07 -0500
>
>
>
> I'd like to know if the broadcasted object gets serialized when accessed
> by the executor during the execution of a task?
> I know that it gets serialized from the driver to the worker. This
> question is inside worker when executor JVM's are accessing it
>
> thanks
> Jeff
>


  1   2   >