rocksdb without checkpointing

2017-02-15 Thread Abhishek R. Singh
Is it possible to set state backend as RocksDB without asking it to checkpoint?

We are trying to do application level checkpointing (since it gives us better 
flexibility to upgrade our flink pipeline and also restore state in a 
application specific upgrade friendly way). So we don’t really need rocksDB to 
do any checkpointing. Moreover, we also observed that there is 20s stall every 
hour that seems to correlate with rocksDB wanting to checkpoint.

Will the following work (effectively disable checkpointing)? 
new RocksDBStateBackend("file:///dev/null") 

Or is there a better way?

-Abhishek-

Re: weird client failure/timeout

2017-01-23 Thread Abhishek R. Singh
Actually, I take it back. It is the last union that is causing issues (of job 
being un-submittable). If I don’t conbineAtEnd, I can go higher (at least 
deploy the job), all the way up to 63. 

After that it starts failing in too many files open in Rocks DB (which I can 
understand and is at least better than silently not accepting my job).

Caused by: java.lang.RuntimeException: Error while opening RocksDB instance.
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:306)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:821)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:118)
... 4 more
Caused by: org.rocksdb.RocksDBException: IO error: 
/var/folders/l1/ncffkbq11_lg6tjk_3cvc_n0gn/T/flink-io-45a78866-a9da-40ca-be51-a894c4fac9be/3815eb68c3777ba4f504e8529db6e145/StreamSource_39_0/dummy_state/7ff48c49-b6ce-4de8-ba7e-8a240b181ae2/db/MANIFEST-01:
 Too many open files
at org.rocksdb.RocksDB.open(Native Method)
at org.rocksdb.RocksDB.open(RocksDB.java:239)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:304)
... 6 more



> On Jan 23, 2017, at 8:20 AM, Abhishek R. Singh 
> <abhis...@tetrationanalytics.com> wrote:
> 
> Is there a limit on how many DataStreams can be defined in a streaming 
> program?
> 
> Looks like flink has problems handling too many data streams? I simplified my 
> topology further. For eg, this works (parallelism of 4)
> 
> 
> 
> However, when I try to go beyond 51 (found empirically by parametrizing 
> nParts), it barfs again. Submission fails, it wants me to increase 
> akka.client.timeout
> 
> Here is the reduced code for repro (union at the end itself is not an issue). 
> It is the parallelism of the first for loop:
> int nParts = cfg.getInt("dummyPartitions", 4);
> boolean combineAtEnd = cfg.getBoolean("dummyCombineAtEnd", true);
> 
> // create lots of streams
> List<SingleOutputStreamOperator> streams = new ArrayList<>(nParts);
> for (int i = 0; i < nParts; i++) {
>   streams.add(env
>   .readFile(
>   new TextInputFormat(new Path("/tmp/input")),
>   "/tmp/input",
>   FileProcessingMode.PROCESS_CONTINUOUSLY,
>   1000,
>   FilePathFilter.createDefaultFilter())
>   .setParallelism(1).name("src"));
> }
> 
> if (combineAtEnd == true) {
>   DataStream combined = streams.get(0);
>   for (int i = 1; i < nParts; i++) {
> combined = combined.union(streams.get(i));
>   }
>   combined.print().setParallelism(1);
> } else { // die parallel
>   for (int i = 1; i < nParts; i++) {
> streams.get(i).print();
>   }
> }
> 
> 
>> On Jan 23, 2017, at 6:14 AM, Abhishek R. Singh 
>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>> wrote:
>> 
>> I even make it 10 minutes:
>> 
>> akka.client.timeout: 600s
>> 
>> But doesn’t feel like it is taking effect. It still comes out at about the 
>> same time with the same error.
>> 
>> -Abhishek-
>> 
>>> On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh 
>>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>>> wrote:
>>> 
>>> yes, I had increased it to 5 minutes. It just sits there and bails out 
>>> again.
>>> 
>>>> On Jan 23, 2017, at 1:47 AM, Jonas <jo...@huntun.de 
>>>> <mailto:jo...@huntun.de>> wrote:
>>>> 
>>>> The exception says that
>>>> 
>>>> Did you already try that?
>>>> 
>>>> 
>>>> 
>>>> --
>>>> View this message in context: 
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
>>>>  
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>>> at Nabble.com <http://nabble.com/>.
>>> 
>> 
> 



Re: weird client failure/timeout

2017-01-23 Thread Abhishek R. Singh
Is there a limit on how many DataStreams can be defined in a streaming program?

Looks like flink has problems handling too many data streams? I simplified my 
topology further. For eg, this works (parallelism of 4)



However, when I try to go beyond 51 (found empirically by parametrizing 
nParts), it barfs again. Submission fails, it wants me to increase 
akka.client.timeout

Here is the reduced code for repro (union at the end itself is not an issue). 
It is the parallelism of the first for loop:
int nParts = cfg.getInt("dummyPartitions", 4);
boolean combineAtEnd = cfg.getBoolean("dummyCombineAtEnd", true);

// create lots of streams
List<SingleOutputStreamOperator> streams = new ArrayList<>(nParts);
for (int i = 0; i < nParts; i++) {
  streams.add(env
  .readFile(
  new TextInputFormat(new Path("/tmp/input")),
  "/tmp/input",
  FileProcessingMode.PROCESS_CONTINUOUSLY,
  1000,
  FilePathFilter.createDefaultFilter())
  .setParallelism(1).name("src"));
}

if (combineAtEnd == true) {
  DataStream combined = streams.get(0);
  for (int i = 1; i < nParts; i++) {
combined = combined.union(streams.get(i));
  }
  combined.print().setParallelism(1);
} else { // die parallel
  for (int i = 1; i < nParts; i++) {
streams.get(i).print();
  }
}


> On Jan 23, 2017, at 6:14 AM, Abhishek R. Singh 
> <abhis...@tetrationanalytics.com> wrote:
> 
> I even make it 10 minutes:
> 
> akka.client.timeout: 600s
> 
> But doesn’t feel like it is taking effect. It still comes out at about the 
> same time with the same error.
> 
> -Abhishek-
> 
>> On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh 
>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>> wrote:
>> 
>> yes, I had increased it to 5 minutes. It just sits there and bails out again.
>> 
>>> On Jan 23, 2017, at 1:47 AM, Jonas <jo...@huntun.de 
>>> <mailto:jo...@huntun.de>> wrote:
>>> 
>>> The exception says that
>>> 
>>> Did you already try that?
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
>>>  
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html>
>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>> at Nabble.com <http://nabble.com/>.
>> 
> 



Re: weird client failure/timeout

2017-01-23 Thread Abhishek R. Singh
I even make it 10 minutes:

akka.client.timeout: 600s

But doesn’t feel like it is taking effect. It still comes out at about the same 
time with the same error.

-Abhishek-

> On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh 
> <abhis...@tetrationanalytics.com> wrote:
> 
> yes, I had increased it to 5 minutes. It just sits there and bails out again.
> 
>> On Jan 23, 2017, at 1:47 AM, Jonas <jo...@huntun.de> wrote:
>> 
>> The exception says that
>> 
>> Did you already try that?
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> at Nabble.com.
> 



Re: weird client failure/timeout

2017-01-23 Thread Abhishek R. Singh
yes, I had increased it to 5 minutes. It just sits there and bails out again.

> On Jan 23, 2017, at 1:47 AM, Jonas  wrote:
> 
> The exception says that
> 
> Did you already try that?
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: weird client failure/timeout

2017-01-23 Thread Abhishek R. Singh
I am using version 1.1.4 (latest stable)

> On Jan 23, 2017, at 12:41 AM, Abhishek R. Singh 
> <abhis...@tetrationanalytics.com> wrote:
> 
> I am trying to construct a topology like this (shown for parallelism of 4) - 
> basically n parallel windowed processing sub-pipelines with single source and 
> single sink:
> 
> 
> 
> I am getting the following  failure (if I go beyond 28 - found empirically 
> using binary search). There is nothing in the job manager logs to 
> troubleshoot this further.
> 
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:10620 
> <http://127.0.0.1:10620/>
> Starting execution of program
> Submitting job with JobID: 27ae3db2946aac3336941bdfa184e537. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#2043695445 
> ]
> 
> 
>  The program finished with the following exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Communication with JobManager failed: Job submission to the 
> JobManager timed out. You may increase 'akka.client.timeout' in case the 
> JobManager needs more time to configure and confirm the job submission.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
> at 
> com.tetration.pipeline.IngestionPipelineMain.main(IngestionPipelineMain.java:116)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:510)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:404)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:321)
> at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
> at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: 
> Communication with JobManager failed: Job submission to the JobManager timed 
> out. You may increase 'akka.client.timeout' in case the JobManager needs more 
> time to configure and confirm the job submission.
> at 
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:406)
> ... 15 more
> Caused by: 
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
> submission to the JobManager timed out. You may increase 
> 'akka.client.timeout' in case the JobManager needs more time to configure and 
> confirm the job submission.
> at 
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:264)
> at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
> at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> 

weird client failure/timeout

2017-01-23 Thread Abhishek R. Singh
I am trying to construct a topology like this (shown for parallelism of 4) - 
basically n parallel windowed processing sub-pipelines with single source and 
single sink:



I am getting the following  failure (if I go beyond 28 - found empirically 
using binary search). There is nothing in the job manager logs to troubleshoot 
this further.

Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:10620
Starting execution of program
Submitting job with JobID: 27ae3db2946aac3336941bdfa184e537. Waiting for job 
completion.
Connected to JobManager at 
Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#2043695445]


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Communication with JobManager failed: Job submission to the 
JobManager timed out. You may increase 'akka.client.timeout' in case the 
JobManager needs more time to configure and confirm the job submission.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
at 
com.tetration.pipeline.IngestionPipelineMain.main(IngestionPipelineMain.java:116)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:510)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:404)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:321)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Communication 
with JobManager failed: Job submission to the JobManager timed out. You may 
increase 'akka.client.timeout' in case the JobManager needs more time to 
configure and confirm the job submission.
at 
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:406)
... 15 more
Caused by: 
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
submission to the JobManager timed out. You may increase 'akka.client.timeout' 
in case the JobManager needs more time to configure and confirm the job 
submission.
at 
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:264)
at 
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at 
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The code to reproduce this problem is shown below (flink job submission itself 
fails, the code has been dumbed down to focus on the topology I am trying to 
build)

int nParts = cfg.getInt("dummyPartitions", 4);

SingleOutputStreamOperator in = env.socketTextStream("localhost",
cfg.getInt("dummyPort", 16408)).setParallelism(1).name("src");

SingleOutputStreamOperator 

Re: checkpoint notifier not found?

2016-12-14 Thread Abhishek R. Singh
Is this more appropriate for dev list? Anyway here is my first:

https://github.com/apache/flink/pull/3006 
<https://github.com/apache/flink/pull/3006>

> On Dec 14, 2016, at 2:38 AM, Robert Metzger <rmetz...@apache.org> wrote:
> 
> Hi Abhishek,
> you can not push to the Flink repository directly. Only Flink committers are 
> allowed to do that.
> But you can fork the Flink repository on github to your own GitHub account 
> and then push the changes to your Github.
> Then, you can create a pull request to offer those changes to the main Flink 
> repository.
> 
> On Wed, Dec 14, 2016 at 1:20 AM, Abhishek R. Singh 
> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
> wrote:
> Not sure how to go from here. How do I create a PR for this?
> 
> $ git branch
> * doc-checkpoint-notify
>   master
> 
> 
> $ git push origin master
> remote: Permission to apache/flink.git denied to abhishsi.
> fatal: unable to access 'https://github.com/apache/flink.git/': 
> <https://github.com/apache/flink.git/':> The requested URL returned error: 403
> 
> 
> $ git diff HEAD^1
> diff --git a/docs/dev/state.md <http://state.md/> b/docs/dev/state.md 
> <http://state.md/>
> index 37de0a8..a753ed1 100644
> --- a/docs/dev/state.md <http://state.md/>
> +++ b/docs/dev/state.md <http://state.md/>
> @@ -281,7 +281,7 @@ public static class CounterSource
>  }
>  {% endhighlight %}
>  
> -Some operators might need the information when a checkpoint is fully 
> acknowledged by Flink to communicate that with the outside world. In this 
> case see the `flink.streaming.api.checkpoint.CheckpointNotifier` interface.
> +Some operators might need the information when a checkpoint is fully 
> acknowledged by Flink to communicate that with the outside world. In this 
> case see the `org.apache.flink.runtime.state.CheckpointListener` interface.
>  
>  ## State Checkpoints in Iterative Jobs
>  
> 
> 
>> On Dec 12, 2016, at 3:11 PM, Abhishek R. Singh 
>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>> wrote:
>> 
>> https://issues.apache.org/jira/browse/FLINK-5323 
>> <https://issues.apache.org/jira/browse/FLINK-5323>
>> 
>>> On Dec 12, 2016, at 5:37 AM, Till Rohrmann <trohrm...@apache.org 
>>> <mailto:trohrm...@apache.org>> wrote:
>>> 
>>> Hi Abhishek,
>>> 
>>> great to hear that you like to become part of the Flink community. Here are 
>>> some information for how to contribute [1].
>>> 
>>> [1] http://flink.apache.org/how-to-contribute.html 
>>> <http://flink.apache.org/how-to-contribute.html>
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Mon, Dec 12, 2016 at 12:36 PM, Abhishek Singh 
>>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>>> wrote:
>>> Will be happy to. Could you guide me a bit in terms of what I need to do?
>>> 
>>> I am a newbie to open source contributing. And currently at Frankfurt 
>>> airport. When I hit ground will be happy to contribute back. Love the 
>>> project !!
>>> 
>>> Thanks for the awesomeness. 
>>> 
>>> 
>>> On Mon, Dec 12, 2016 at 12:29 PM Stephan Ewen <se...@apache.org 
>>> <mailto:se...@apache.org>> wrote:
>>> Thanks for reporting this.
>>> It would be awesome if you could file a JIRA or a pull request for fixing 
>>> the docs for that.
>>> 
>>> On Sat, Dec 10, 2016 at 2:02 AM, Abhishek R. Singh 
>>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>>> wrote:
>>> I was following the official documentation: 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html>
>>> 
>>> Looks like this is the right one to be using: import 
>>> org.apache.flink.runtime.state.CheckpointListener;
>>> 
>>> -Abhishek-
>>> 
>>>> On Dec 9, 2016, at 4:30 PM, Abhishek R. Singh 
>>>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>>>> wrote:
>>>> 
>>>> I can’t seem to find CheckpointNotifier. Appreciate help !
>>>> 
>>>> CheckpointNotifier is not a member of package 
>>>> org.apache.flink.streaming.api.checkpoint
>>>> 
>>>> From my pom.xml:
>>>> 
>>>> 
>>>> org.apache.flink
>>>> flink-scala_2.11
>>>> 1.1.3
>>>> 
>>>> 
>>>> org.apache.flink
>>>> flink-streaming-scala_2.11
>>>> 1.1.3
>>>> 
>>>> 
>>>> org.apache.flink
>>>> flink-clients_2.11
>>>> 1.1.3
>>>> 
>>>> 
>>>> org.apache.flink
>>>> flink-statebackend-rocksdb_2.11
>>>> 1.1.3
>>>> 
>>> 
>>> 
>>> 
>> 
> 
> 



Re: checkpoint notifier not found?

2016-12-13 Thread Abhishek R. Singh
Not sure how to go from here. How do I create a PR for this?

$ git branch
* doc-checkpoint-notify
  master


$ git push origin master
remote: Permission to apache/flink.git denied to abhishsi.
fatal: unable to access 'https://github.com/apache/flink.git/': The requested 
URL returned error: 403


$ git diff HEAD^1
diff --git a/docs/dev/state.md b/docs/dev/state.md
index 37de0a8..a753ed1 100644
--- a/docs/dev/state.md
+++ b/docs/dev/state.md
@@ -281,7 +281,7 @@ public static class CounterSource
 }
 {% endhighlight %}
 
-Some operators might need the information when a checkpoint is fully 
acknowledged by Flink to communicate that with the outside world. In this case 
see the `flink.streaming.api.checkpoint.CheckpointNotifier` interface.
+Some operators might need the information when a checkpoint is fully 
acknowledged by Flink to communicate that with the outside world. In this case 
see the `org.apache.flink.runtime.state.CheckpointListener` interface.
 
 ## State Checkpoints in Iterative Jobs
 


> On Dec 12, 2016, at 3:11 PM, Abhishek R. Singh 
> <abhis...@tetrationanalytics.com> wrote:
> 
> https://issues.apache.org/jira/browse/FLINK-5323 
> <https://issues.apache.org/jira/browse/FLINK-5323>
> 
>> On Dec 12, 2016, at 5:37 AM, Till Rohrmann <trohrm...@apache.org 
>> <mailto:trohrm...@apache.org>> wrote:
>> 
>> Hi Abhishek,
>> 
>> great to hear that you like to become part of the Flink community. Here are 
>> some information for how to contribute [1].
>> 
>> [1] http://flink.apache.org/how-to-contribute.html 
>> <http://flink.apache.org/how-to-contribute.html>
>> 
>> Cheers,
>> Till
>> 
>> On Mon, Dec 12, 2016 at 12:36 PM, Abhishek Singh 
>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>> wrote:
>> Will be happy to. Could you guide me a bit in terms of what I need to do?
>> 
>> I am a newbie to open source contributing. And currently at Frankfurt 
>> airport. When I hit ground will be happy to contribute back. Love the 
>> project !!
>> 
>> Thanks for the awesomeness. 
>> 
>> 
>> On Mon, Dec 12, 2016 at 12:29 PM Stephan Ewen <se...@apache.org 
>> <mailto:se...@apache.org>> wrote:
>> Thanks for reporting this.
>> It would be awesome if you could file a JIRA or a pull request for fixing 
>> the docs for that.
>> 
>> On Sat, Dec 10, 2016 at 2:02 AM, Abhishek R. Singh 
>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>> wrote:
>> I was following the official documentation: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html>
>> 
>> Looks like this is the right one to be using: import 
>> org.apache.flink.runtime.state.CheckpointListener;
>> 
>> -Abhishek-
>> 
>>> On Dec 9, 2016, at 4:30 PM, Abhishek R. Singh 
>>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>>> wrote:
>>> 
>>> I can’t seem to find CheckpointNotifier. Appreciate help !
>>> 
>>> CheckpointNotifier is not a member of package 
>>> org.apache.flink.streaming.api.checkpoint
>>> 
>>> From my pom.xml:
>>> 
>>> 
>>> org.apache.flink
>>> flink-scala_2.11
>>> 1.1.3
>>> 
>>> 
>>> org.apache.flink
>>> flink-streaming-scala_2.11
>>> 1.1.3
>>> 
>>> 
>>> org.apache.flink
>>> flink-clients_2.11
>>> 1.1.3
>>> 
>>> 
>>> org.apache.flink
>>> flink-statebackend-rocksdb_2.11
>>> 1.1.3
>>> 
>> 
>> 
>> 
> 



Re: checkpoint notifier not found?

2016-12-12 Thread Abhishek R. Singh
https://issues.apache.org/jira/browse/FLINK-5323 
<https://issues.apache.org/jira/browse/FLINK-5323>

> On Dec 12, 2016, at 5:37 AM, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> Hi Abhishek,
> 
> great to hear that you like to become part of the Flink community. Here are 
> some information for how to contribute [1].
> 
> [1] http://flink.apache.org/how-to-contribute.html 
> <http://flink.apache.org/how-to-contribute.html>
> 
> Cheers,
> Till
> 
> On Mon, Dec 12, 2016 at 12:36 PM, Abhishek Singh 
> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
> wrote:
> Will be happy to. Could you guide me a bit in terms of what I need to do?
> 
> I am a newbie to open source contributing. And currently at Frankfurt 
> airport. When I hit ground will be happy to contribute back. Love the project 
> !!
> 
> Thanks for the awesomeness. 
> 
> 
> On Mon, Dec 12, 2016 at 12:29 PM Stephan Ewen <se...@apache.org 
> <mailto:se...@apache.org>> wrote:
> Thanks for reporting this.
> It would be awesome if you could file a JIRA or a pull request for fixing the 
> docs for that.
> 
> On Sat, Dec 10, 2016 at 2:02 AM, Abhishek R. Singh 
> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
> wrote:
> I was following the official documentation: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html>
> 
> Looks like this is the right one to be using: import 
> org.apache.flink.runtime.state.CheckpointListener;
> 
> -Abhishek-
> 
>> On Dec 9, 2016, at 4:30 PM, Abhishek R. Singh 
>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>> wrote:
>> 
>> I can’t seem to find CheckpointNotifier. Appreciate help !
>> 
>> CheckpointNotifier is not a member of package 
>> org.apache.flink.streaming.api.checkpoint
>> 
>> From my pom.xml:
>> 
>> 
>> org.apache.flink
>> flink-scala_2.11
>> 1.1.3
>> 
>> 
>> org.apache.flink
>> flink-streaming-scala_2.11
>> 1.1.3
>> 
>> 
>> org.apache.flink
>> flink-clients_2.11
>> 1.1.3
>> 
>> 
>> org.apache.flink
>> flink-statebackend-rocksdb_2.11
>> 1.1.3
>> 
> 
> 
> 



Re: checkpoint notifier not found?

2016-12-09 Thread Abhishek R. Singh
I was following the official documentation: 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html>

Looks like this is the right one to be using: import 
org.apache.flink.runtime.state.CheckpointListener;

-Abhishek-

> On Dec 9, 2016, at 4:30 PM, Abhishek R. Singh 
> <abhis...@tetrationanalytics.com> wrote:
> 
> I can’t seem to find CheckpointNotifier. Appreciate help !
> 
> CheckpointNotifier is not a member of package 
> org.apache.flink.streaming.api.checkpoint
> 
> From my pom.xml:
> 
> 
> org.apache.flink
> flink-scala_2.11
> 1.1.3
> 
> 
> org.apache.flink
> flink-streaming-scala_2.11
> 1.1.3
> 
> 
> org.apache.flink
> flink-clients_2.11
> 1.1.3
> 
> 
> org.apache.flink
> flink-statebackend-rocksdb_2.11
> 1.1.3
> 



checkpoint notifier not found?

2016-12-09 Thread Abhishek R. Singh
I can’t seem to find CheckpointNotifier. Appreciate help !

CheckpointNotifier is not a member of package 
org.apache.flink.streaming.api.checkpoint

From my pom.xml:


org.apache.flink
flink-scala_2.11
1.1.3


org.apache.flink
flink-streaming-scala_2.11
1.1.3


org.apache.flink
flink-clients_2.11
1.1.3


org.apache.flink
flink-statebackend-rocksdb_2.11
1.1.3


Re: StructuredStreaming status

2016-10-19 Thread Abhishek R. Singh
Its not so much about latency actually. The bigger rub for me is that the state 
has to be reshuffled every micro/mini-batch (unless I am not understanding it 
right - spark 2.0 state model i.e.).

Operator model avoids it by preserving state locality. Event time processing 
and state purging are the other essentials (which are thankfully getting 
addressed).

Any guidance on (timelines for) expected exit from alpha state would also be 
greatly appreciated.

-Abhishek-

> On Oct 19, 2016, at 5:36 PM, Matei Zaharia  wrote:
> 
> I'm also curious whether there are concerns other than latency with the way 
> stuff executes in Structured Streaming (now that the time steps don't have to 
> act as triggers), as well as what latency people want for various apps.
> 
> The stateful operator designs for streaming systems aren't inherently 
> "better" than micro-batching -- they lose a lot of stuff that is possible in 
> Spark, such as load balancing work dynamically across nodes, speculative 
> execution for stragglers, scaling clusters up and down elastically, etc. 
> Moreover, Spark itself could execute the current model with much lower 
> latency. The question is just what combinations of latency, throughput, fault 
> recovery, etc to target.
> 
> Matei
> 
>> On Oct 19, 2016, at 2:18 PM, Amit Sela > > wrote:
>> 
>> 
>> 
>> On Thu, Oct 20, 2016 at 12:07 AM Shivaram Venkataraman 
>> > wrote:
>> At the AMPLab we've been working on a research project that looks at
>> just the scheduling latencies and on techniques to get lower
>> scheduling latency. It moves away from the micro-batch model, but
>> reuses the fault tolerance etc. in Spark. However we haven't yet
>> figure out all the parts in integrating this with the rest of
>> structured streaming. I'll try to post a design doc / SIP about this
>> soon.
>> 
>> On a related note - are there other problems users face with
>> micro-batch other than latency ?
>> I think that the fact that they serve as an output trigger is a problem, but 
>> Structured Streaming seems to resolve this now.  
>> 
>> Thanks
>> Shivaram
>> 
>> On Wed, Oct 19, 2016 at 1:29 PM, Michael Armbrust
>> > wrote:
>> > I know people are seriously thinking about latency.  So far that has not
>> > been the limiting factor in the users I've been working with.
>> >
>> > On Wed, Oct 19, 2016 at 1:11 PM, Cody Koeninger > > > wrote:
>> >>
>> >> Is anyone seriously thinking about alternatives to microbatches?
>> >>
>> >> On Wed, Oct 19, 2016 at 2:45 PM, Michael Armbrust
>> >> > wrote:
>> >> > Anything that is actively being designed should be in JIRA, and it seems
>> >> > like you found most of it.  In general, release windows can be found on
>> >> > the
>> >> > wiki.
>> >> >
>> >> > 2.1 has a lot of stability fixes as well as the kafka support you
>> >> > mentioned.
>> >> > It may also include some of the following.
>> >> >
>> >> > The items I'd like to start thinking about next are:
>> >> >  - Evicting state from the store based on event time watermarks
>> >> >  - Sessionization (grouping together related events by key / eventTime)
>> >> >  - Improvements to the query planner (remove some of the restrictions on
>> >> > what queries can be run).
>> >> >
>> >> > This is roughly in order based on what I've been hearing users hit the
>> >> > most.
>> >> > Would love more feedback on what is blocking real use cases.
>> >> >
>> >> > On Tue, Oct 18, 2016 at 1:51 AM, Ofir Manor > >> > >
>> >> > wrote:
>> >> >>
>> >> >> Hi,
>> >> >> I hope it is the right forum.
>> >> >> I am looking for some information of what to expect from
>> >> >> StructuredStreaming in its next releases to help me choose when / where
>> >> >> to
>> >> >> start using it more seriously (or where to invest in workarounds and
>> >> >> where
>> >> >> to wait). I couldn't find a good place where such planning discussed
>> >> >> for 2.1
>> >> >> (like, for example ML and SPARK-15581).
>> >> >> I'm aware of the 2.0 documented limits
>> >> >>
>> >> >> (http://spark.apache.org/docs/2.0.1/structured-streaming-programming-guide.html#unsupported-operations
>> >> >>  
>> >> >> ),
>> >> >> like no support for multiple aggregations levels, joins are strictly to
>> >> >> a
>> >> >> static dataset (no SCD or stream-stream) etc, limited sources / sinks
>> >> >> (like
>> >> >> no sink for interactive queries) etc etc
>> >> >> I'm also aware of some changes that have landed in master, like the new
>> >> >> Kafka 0.10 source (and its on-going improvements) in SPARK-15406, the
>> >> >> metrics in SPARK-17731, and 

flink async snapshots

2016-05-19 Thread Abhishek R. Singh
If you can take atomic in-memory copies, then it works (at the cost of doubling 
your instantaneous memory). For larger state (say rocks DB), won’t you have to 
stop the world (atomic snapshot) and make a copy? Doesn’t that make it 
synchronous, instead of background/async?

Sorry Stravros - for bumping into your thread. This should probably have been a 
new thread (I changed the subject in an attempt to fix up).

-Abhishek-

> On May 19, 2016, at 11:42 AM, Paris Carbone <par...@kth.se> wrote:
> 
> Hi Abhishek, 
> I don’t see the problem there (also this is unrelated to the snapshotting 
> protocol). 
> Intuitively, if you submit a copy of your state (full or delta) for a 
> snapshot version/epoch to a store backend and validate the full snapshot for 
> that version when you eventually receive the acknowledgements this still 
> works fine. Am I missing something?
> 
>> On 19 May 2016, at 20:36, Abhishek R. Singh <abhis...@tetrationanalytics.com 
>> <mailto:abhis...@tetrationanalytics.com>> wrote:
>> 
>> I was wondering how checkpoints can be async? Because your state is 
>> constantly mutating. You probably need versioned state, or immutable data 
>> structs?
>> 
>> -Abhishek-
>> 
>>> On May 19, 2016, at 11:14 AM, Paris Carbone <par...@kth.se 
>>> <mailto:par...@kth.se>> wrote:
>>> 
>>> Hi Stavros,
>>> 
>>> Currently, rollback failure recovery in Flink works in the pipeline level, 
>>> not in the task level (see Millwheel [1]). It further builds on repayable 
>>> stream logs (i.e. Kafka), thus, there is no need for 3pc or backup in the 
>>> pipeline sources. You can also check this presentation [2] which explains 
>>> the basic concepts more in detail I hope. Mind that many upcoming 
>>> optimisation opportunities are going to be addressed in the not so 
>>> long-term Flink roadmap.
>>> 
>>> Paris
>>> 
>>> [1] 
>>> http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf
>>>  
>>> <http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf>
>>> [2] 
>>> http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha
>>>  
>>> <http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
>>> 
>>>  
>>> <http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
>>> 
>>>  
>>> <http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha>
>>>> On 19 May 2016, at 19:43, Stavros Kontopoulos <st.kontopou...@gmail.com 
>>>> <mailto:st.kontopou...@gmail.com>> wrote:
>>>> 
>>>> Cool thnx. So if a checkpoint expires the pipeline will block or fail in 
>>>> total or only the specific task related to the operator (running along 
>>>> with the checkpoint task) or nothing happens?
>>>> 
>>>> On Tue, May 17, 2016 at 3:49 PM, Robert Metzger <rmetz...@apache.org 
>>>> <mailto:rmetz...@apache.org>> wrote:
>>>> Hi Stravos,
>>>> 
>>>> I haven't implemented our checkpointing mechanism and I didn't participate 
>>>> in the design decisions while implementing it, so I can not compare it in 
>>>> detail to other approaches.
>>>> 
>>>> From a "does it work perspective": Checkpoints are only confirmed if all 
>>>> parallel subtasks successfully created a valid snapshot of the state. So 
>>>> if there is a failure in the checkpointing mechanism, no valid checkpoint 
>>>> will be created. The system will recover from the last valid checkpoint.
>>>> There is a timeout for checkpoints. So if a barrier doesn't pass through 
>>>> the system for a certain period of time, the checkpoint is cancelled. The 
>>>> default timeout is 10 minutes.
>>>> 
>>>> Regards,
>>>> Robert
>>>> 
>>>> 
>>>> On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos 
>>>> <st.kontopou...@gmail.com <mailto:st.kontopou...@gmail.com>> wrote:
>>>> Hi,
>>>> 
>>>> I was looking into the flink snapshotting algorithm details also mentioned 
>>>> here:
>>>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
>>>>  
>>>> <http://data-artisans.com/high-throughput-low-latency-

Re: flink snapshotting fault-tolerance

2016-05-19 Thread Abhishek R. Singh
I was wondering how checkpoints can be async? Because your state is constantly 
mutating. You probably need versioned state, or immutable data structs?

-Abhishek-

> On May 19, 2016, at 11:14 AM, Paris Carbone  wrote:
> 
> Hi Stavros,
> 
> Currently, rollback failure recovery in Flink works in the pipeline level, 
> not in the task level (see Millwheel [1]). It further builds on repayable 
> stream logs (i.e. Kafka), thus, there is no need for 3pc or backup in the 
> pipeline sources. You can also check this presentation [2] which explains the 
> basic concepts more in detail I hope. Mind that many upcoming optimisation 
> opportunities are going to be addressed in the not so long-term Flink roadmap.
> 
> Paris
> 
> [1] 
> http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf
>  
> 
> [2] 
> http://www.slideshare.net/ParisCarbone/tech-talk-google-on-flink-fault-tolerance-and-ha
>  
> 
> 
>  
> 
> 
>  
> 
>> On 19 May 2016, at 19:43, Stavros Kontopoulos > > wrote:
>> 
>> Cool thnx. So if a checkpoint expires the pipeline will block or fail in 
>> total or only the specific task related to the operator (running along with 
>> the checkpoint task) or nothing happens?
>> 
>> On Tue, May 17, 2016 at 3:49 PM, Robert Metzger > > wrote:
>> Hi Stravos,
>> 
>> I haven't implemented our checkpointing mechanism and I didn't participate 
>> in the design decisions while implementing it, so I can not compare it in 
>> detail to other approaches.
>> 
>> From a "does it work perspective": Checkpoints are only confirmed if all 
>> parallel subtasks successfully created a valid snapshot of the state. So if 
>> there is a failure in the checkpointing mechanism, no valid checkpoint will 
>> be created. The system will recover from the last valid checkpoint.
>> There is a timeout for checkpoints. So if a barrier doesn't pass through the 
>> system for a certain period of time, the checkpoint is cancelled. The 
>> default timeout is 10 minutes.
>> 
>> Regards,
>> Robert
>> 
>> 
>> On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos 
>> > wrote:
>> Hi,
>> 
>> I was looking into the flink snapshotting algorithm details also mentioned 
>> here:
>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
>>  
>> 
>> https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-distributed-dataflows/
>>  
>> 
>> http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E
>>  
>> 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-exactly-once-question-td2545.html
>>  
>> 
>> 
>> From other sources i understand that it assumes no failures to work for 
>> message delivery or for example a process hanging for ever:
>> https://en.wikipedia.org/wiki/Snapshot_algorithm 
>> 
>> https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/
>>  
>> 
>> 
>> So my understanding (maybe wrong) is that this is a solution which seems not 
>> to address the fault tolerance issue in a strong manner like for example if 
>> it was to use a 3pc protocol for local state propagation and global 
>> agreement. I know the latter is not efficient just mentioning it for 
>> comparison. 
>> 
>> How the algorithm behaves in practical terms under the presence of its own 
>> failures (this is a background process collecting partial states)? Are there 
>> timeouts for reaching a barrier?
>> 
>> PS. have not looked deep into the code details yet, planning to.
>> 
>> Best,
>> Stavros
>> 
>> 
>> 
> 



custom sources

2016-05-17 Thread Abhishek R. Singh
Hi,

Can we define custom sources in link? Control the barriers and (thus) 
checkpoints at good watermark points?

-Abhishek-

Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread Abhishek R. Singh
You had:

 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

Maybe try:

 rdd2 = RDD.reduceByKey((x,y) = x+y)
 rdd2.take(3)

-Abhishek-

On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote:

 HI All,
 I have data in RDD as mentioned below:
 
 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
 
 
 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on 
 Values for each key
 
 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)
 
 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at 
 console:73
 res:Array[(Int,Int)] = Array()
 
 Command as mentioned
 
 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile
 
 
 Please let me know what is missing in my code, as my resultant Array is empty
 
 
 
 Regards,
 Satish
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Writing to multiple outputs in Spark

2015-08-14 Thread Abhishek R. Singh
A workaround would be to have multiple passes on the RDD and each pass write 
its own output?

Or in a foreachPartition do it in a single pass (open up multiple files per 
partition to write out)?

-Abhishek-

On Aug 14, 2015, at 7:56 AM, Silas Davis si...@silasdavis.net wrote:

 Would it be right to assume that the silence on this topic implies others 
 don't really have this issue/desire?
 
 On Sat, 18 Jul 2015 at 17:24 Silas Davis si...@silasdavis.net wrote:
 tl;dr hadoop and cascading provide ways of writing tuples to multiple output 
 files based on key, but the plain RDD interface doesn't seem to and it should.
 
 I have been looking into ways to write to multiple outputs in Spark. It seems 
 like a feature that is somewhat missing from Spark.
 
 The idea is to partition output and write the elements of an RDD to different 
 locations depending based on the key. For example in a pair RDD your key may 
 be (language, date, userId) and you would like to write separate files to 
 $someBasePath/$language/$date. Then there would be  a version of 
 saveAsHadoopDataset that would be able to multiple location based on key 
 using the underlying OutputFormat. Perahps it would take a pair RDD with keys 
 ($partitionKey, $realKey), so for example ((language, date), userId).
 
 The prior art I have found on this is the following.
 
 Using SparkSQL:
 The 'partitionBy' method of DataFrameWriter: 
 https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
 
 This only works for parquet at the moment.
 
 Using Spark/Hadoop:
 This pull request (with the hadoop1 API,) : 
 https://github.com/apache/spark/pull/4895/files. 
 
 This uses MultipleTextOutputFormat (which in turn uses MultipleOutputFormat) 
 which is part of the old hadoop1 API. It only works for text but could be 
 generalised for any underlying OutputFormat by using MultipleOutputFormat 
 (but only for hadoop1 - which doesn't support ParquetAvroOutputFormat for 
 example)
 
 This gist (With the hadoop2 API): 
 https://gist.github.com/mlehman/df9546f6be2e362bbad2 
 
 This uses MultipleOutputs (available for both the old and new hadoop APIs) 
 and extends saveAsNewHadoopDataset to support multiple outputs. Should work 
 for any underlying OutputFormat. Probably better implemented by extending 
 saveAs[NewAPI]HadoopDataset.
 
 In Cascading:
 Cascading provides PartititionTap: 
 http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html
  to do this
 
 So my questions are: is there a reason why Spark doesn't provide this? Does 
 Spark provide similar functionality through some other mechanism? How would 
 it be best implemented?
 
 Since I started composing this message I've had a go at writing an wrapper 
 OutputFormat that writes multiple outputs using hadoop MultipleOutputs but 
 doesn't require modification of the PairRDDFunctions. The principle is 
 similar however. Again it feels slightly hacky to use dummy fields for the 
 ReduceContextImpl, but some of this may be a part of the impedance mismatch 
 between Spark and plain Hadoop... Here is my attempt: 
 https://gist.github.com/silasdavis/d1d1f1f7ab78249af462 
 
 I'd like to see this functionality in Spark somehow but invite suggestion of 
 how best to achieve it.
 
 Thanks,
 Silas



Re: tachyon

2015-08-07 Thread Abhishek R. Singh
Thanks Calvin - much appreciated !

-Abhishek-

On Aug 7, 2015, at 11:11 AM, Calvin Jia jia.cal...@gmail.com wrote:

 Hi Abhishek,
 
 Here's a production use case that may interest you: 
 http://www.meetup.com/Tachyon/events/222485713/
 
 Baidu is using Tachyon to manage more than 100 nodes in production resulting 
 in a 30x performance improvement for their SparkSQL workload. They are also 
 using the tiered storage feature in Tachyon giving them over 2PB of Tachyon 
 managed space.
 
 Hope this helps,
 Calvin
 
 On Fri, Aug 7, 2015 at 10:00 AM, Ted Yu yuzhih...@gmail.com wrote:
 Looks like you would get better response on Tachyon's mailing list:
 
 https://groups.google.com/forum/?fromgroups#!forum/tachyon-users
 
 Cheers
 
 On Fri, Aug 7, 2015 at 9:56 AM, Abhishek R. Singh 
 abhis...@tetrationanalytics.com wrote:
 Do people use Tachyon in production, or is it experimental grade still?
 
 Regards,
 Abhishek
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 



tachyon

2015-08-07 Thread Abhishek R. Singh
Do people use Tachyon in production, or is it experimental grade still?

Regards,
Abhishek

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to increase parallelism of a Spark cluster?

2015-08-02 Thread Abhishek R. Singh
I don't know if (your assertion/expectation that) workers will process things 
(multiple partitions) in parallel is really valid. Or if having more partitions 
than workers will necessarily help (unless you are memory bound - so partitions 
is essentially helping your work size rather than execution parallelism).

[Disclaimer: I am no authority on Spark, but wanted to throw my spin based my 
own understanding]. 

Nothing official about it :)

-abhishek-

 On Jul 31, 2015, at 1:03 PM, Sujit Pal sujitatgt...@gmail.com wrote:
 
 Hello,
 
 I am trying to run a Spark job that hits an external webservice to get back 
 some information. The cluster is 1 master + 4 workers, each worker has 60GB 
 RAM and 4 CPUs. The external webservice is a standalone Solr server, and is 
 accessed using code similar to that shown below.
 
 def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr)
 keyValues.map(keyValue = (keyValue._1, process(solr, keyValue)))
 }
 myRDD.repartition(10) 
  .mapPartitions(keyValues = getResults(keyValues))
  
 The mapPartitions does some initialization to the SolrJ client per partition 
 and then hits it for each record in the partition via the getResults() call.
 
 I repartitioned in the hope that this will result in 10 clients hitting Solr 
 simultaneously (I would like to go upto maybe 30-40 simultaneous clients if I 
 can). However, I counted the number of open connections using netstat -anp | 
 grep :8983.*ESTABLISHED in a loop on the Solr box and observed that Solr 
 has a constant 4 clients (ie, equal to the number of workers) over the 
 lifetime of the run.
 
 My observation leads me to believe that each worker processes a single stream 
 of work sequentially. However, from what I understand about how Spark works, 
 each worker should be able to process number of tasks parallelly, and that 
 repartition() is a hint for it to do so.
 
 Is there some SparkConf environment variable I should set to increase 
 parallelism in these workers, or should I just configure a cluster with 
 multiple workers per machine? Or is there something I am doing wrong?
 
 Thank you in advance for any pointers you can provide.
 
 -sujit
 


spark streaming disk hit

2015-07-21 Thread Abhishek R. Singh
Is it fair to say that Storm stream processing is completely in memory, whereas 
spark streaming would take a disk hit because of how shuffle works?

Does spark streaming try to avoid disk usage out of the box?

-Abhishek-
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming disk hit

2015-07-21 Thread Abhishek R. Singh
Thanks TD - appreciate the response !

On Jul 21, 2015, at 1:54 PM, Tathagata Das t...@databricks.com wrote:

 Most shuffle files are really kept around in the OS's buffer/disk cache, so 
 it is still pretty much in memory. If you are concerned about performance, 
 you have to do a holistic comparison for end-to-end performance. You could 
 take a look at this. 
 
 https://spark-summit.org/2015/events/towards-benchmarking-modern-distributed-streaming-systems/
 
 On Tue, Jul 21, 2015 at 11:57 AM, Abhishek R. Singh 
 abhis...@tetrationanalytics.com wrote:
 Is it fair to say that Storm stream processing is completely in memory, 
 whereas spark streaming would take a disk hit because of how shuffle works?
 
 Does spark streaming try to avoid disk usage out of the box?
 
 -Abhishek-
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 



Re: Grouping runs of elements in a RDD

2015-06-30 Thread Abhishek R. Singh
could you use a custom partitioner to preserve boundaries such that all related 
tuples end up on the same partition?

On Jun 30, 2015, at 12:00 PM, RJ Nowling rnowl...@gmail.com wrote:

 Thanks, Reynold.  I still need to handle incomplete groups that fall between 
 partition boundaries. So, I need a two-pass approach. I came up with a 
 somewhat hacky way to handle those using the partition indices and key-value 
 pairs as a second pass after the first.
 
 OCaml's std library provides a function called group() that takes a break 
 function that operators on pairs of successive elements.  It seems a similar 
 approach could be used in Spark and would be more efficient than my approach 
 with key-value pairs since you know the ordering of the partitions.
 
 Has this need been expressed by others?  
 
 On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin r...@databricks.com wrote:
 Try mapPartitions, which gives you an iterator, and you can produce an 
 iterator back.
 
 
 On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling rnowl...@gmail.com wrote:
 Hi all,
 
 I have a problem where I have a RDD of elements:
 
 Item1 Item2 Item3 Item4 Item5 Item6 ...
 
 and I want to run a function over them to decide which runs of elements to 
 group together:
 
 [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
 
 Technically, I could use aggregate to do this, but I would have to use a List 
 of List of T which would produce a very large collection in memory.
 
 Is there an easy way to accomplish this?  e.g.,, it would be nice to have a 
 version of aggregate where the combination function can return a complete 
 group that is added to the new RDD and an incomplete group which is passed to 
 the next call of the reduce function.
 
 Thanks,
 RJ
 
 



Re: Grouping runs of elements in a RDD

2015-06-30 Thread Abhishek R. Singh
could you use a custom partitioner to preserve boundaries such that all related 
tuples end up on the same partition?

On Jun 30, 2015, at 12:00 PM, RJ Nowling rnowl...@gmail.com wrote:

 Thanks, Reynold.  I still need to handle incomplete groups that fall between 
 partition boundaries. So, I need a two-pass approach. I came up with a 
 somewhat hacky way to handle those using the partition indices and key-value 
 pairs as a second pass after the first.
 
 OCaml's std library provides a function called group() that takes a break 
 function that operators on pairs of successive elements.  It seems a similar 
 approach could be used in Spark and would be more efficient than my approach 
 with key-value pairs since you know the ordering of the partitions.
 
 Has this need been expressed by others?  
 
 On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin r...@databricks.com wrote:
 Try mapPartitions, which gives you an iterator, and you can produce an 
 iterator back.
 
 
 On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling rnowl...@gmail.com wrote:
 Hi all,
 
 I have a problem where I have a RDD of elements:
 
 Item1 Item2 Item3 Item4 Item5 Item6 ...
 
 and I want to run a function over them to decide which runs of elements to 
 group together:
 
 [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
 
 Technically, I could use aggregate to do this, but I would have to use a List 
 of List of T which would produce a very large collection in memory.
 
 Is there an easy way to accomplish this?  e.g.,, it would be nice to have a 
 version of aggregate where the combination function can return a complete 
 group that is added to the new RDD and an incomplete group which is passed to 
 the next call of the reduce function.
 
 Thanks,
 RJ
 
 



spark sql error with proto/parquet

2015-04-18 Thread Abhishek R. Singh
I have created a bunch of protobuf based parquet files that I want to 
read/inspect using Spark SQL. However, I am running into exceptions and not 
able to proceed much further:

This succeeds successfully (probably because there is no action yet). I can 
also printSchema() and count() without any issues: 

scala val df = sqlContext.load(“my_root_dir/201504101000, parquet)

scala df.select(df(summary)).first

15/04/18 17:03:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0 
(TID 27, xxx.yyy.com): parquet.io.ParquetDecodingException: Can not read value 
at 0 in block -1 in file 
hdfs://xxx.yyy.com:8020/my_root_dir/201504101000/0.parquet
at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
at 
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: parquet.io.ParquetDecodingException: The requested schema is not 
compatible with the file schema. incompatible types: optional group …


I could convert my protos into json and then back to parquet, but that seems 
wasteful !

Also, I will be happy to contribute and make protobuf work with Spark SQL if I 
can get some guidance/help/pointers. Help appreciated.

-Abhishek-

Re: Dataframes Question

2015-04-18 Thread Abhishek R. Singh
I am no expert myself, but from what I understand DataFrame is grandfathering 
SchemaRDD. This was done for API stability as spark sql matured out of alpha as 
part of 1.3.0 release. 

It is forward looking and brings (dataframe like) syntax that was not available 
with the older schema RDD.

On Apr 18, 2015, at 4:43 PM, Arun Patel arunp.bigd...@gmail.com wrote:

 Experts,
 
 I have few basic questions on DataFrames vs Spark SQL.  My confusion is more 
 with DataFrames. 
 
 1)  What is the difference between Spark SQL and DataFrames?  Are they same?
 2)  Documentation says SchemaRDD is renamed as DataFrame. This means 
 SchemaRDD is not existing in 1.3?  
 3)  As per documentation, it looks like creating dataframe is no different 
 than SchemaRDD -  df = 
 sqlContext.jsonFile(examples/src/main/resources/people.json).  
 So, my question is what is the difference?
 
 Thanks for your help.  
 
 Arun


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org