[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-07-09 Thread tmalaska
Github user tmalaska closed the pull request at:

https://github.com/apache/spark/pull/566


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-21 Thread tmalaska
Github user tmalaska commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-46763419
  
New Pull request https://github.com/apache/spark/pull/1168


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-21 Thread tmalaska
Github user tmalaska commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-46755792
  
I'm going to have to make a new pull request, because I had drop the repo 
that belonged to this pull request.  I will update the ticket with the 
information when it's ready


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-20 Thread tmalaska
Github user tmalaska commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-46726131
  
No worries.  I'm starting to free up so I would love to do more work.  I 
will finish this one up then the Flume encryption one.  Then if you have 
anything else. Let me at it.

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-20 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-46725214
  
Sorry Ted, that this has been sitting here for so long. Will get this in 
ASAP. 
Other than a few nit, it LGTM. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-20 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r14040594
  
--- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 ---
@@ -36,17 +36,27 @@ import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.Logging
 import org.apache.spark.streaming.receiver.Receiver
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import org.jboss.netty.channel.ChannelPipelineFactory
+import java.util.concurrent.Executors
+import org.jboss.netty.channel.Channels
+import org.jboss.netty.handler.codec.compression.ZlibDecoder
--- End diff --

please dedup, and sort. see import style in 
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-20 Thread tmalaska
Github user tmalaska commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r14040514
  
--- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 ---
@@ -134,22 +144,64 @@ private[streaming]
 class FlumeReceiver(
 host: String,
 port: Int,
-storageLevel: StorageLevel
+storageLevel: StorageLevel,
+enableDecompression: Boolean
   ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
 
   lazy val responder = new SpecificResponder(
 classOf[AvroSourceProtocol], new FlumeEventServer(this))
-  lazy val server = new NettyServer(responder, new InetSocketAddress(host, 
port))
+  var server: NettyServer = null
+
+  private def initServer() = {
+if (enableDecompression) {
+  val channelFactory = new NioServerSocketChannelFactory
+(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+  val channelPipelieFactory = new CompressionChannelPipelineFactory()
+  
+  new NettyServer(
+responder, 
+new InetSocketAddress(host, port),
+channelFactory, 
+channelPipelieFactory, 
+null)
+} else {
+  new NettyServer(responder, new InetSocketAddress(host, port))
+}
+  }
 
   def onStart() {
-server.start()
+synchronized {
+  if (server == null) {
+server = initServer()
+server.start()
+  } else {
+logWarning("Flume receiver being asked to start more then once 
with out close")
+  }
+}
 logInfo("Flume receiver started")
   }
 
   def onStop() {
-server.close()
+synchronized {
+  if (server != null) {
+server.close()
+server = null
+  }
+}
 logInfo("Flume receiver stopped")
   }
 
   override def preferredLocation = Some(host)
 }
+
+private[streaming]
+class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
+
+  def getPipeline() = {
--- End diff --

Cool will do before the weekend is done.  Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-20 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r14040386
  
--- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 ---
@@ -134,22 +144,64 @@ private[streaming]
 class FlumeReceiver(
 host: String,
 port: Int,
-storageLevel: StorageLevel
+storageLevel: StorageLevel,
+enableDecompression: Boolean
   ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
 
   lazy val responder = new SpecificResponder(
 classOf[AvroSourceProtocol], new FlumeEventServer(this))
-  lazy val server = new NettyServer(responder, new InetSocketAddress(host, 
port))
+  var server: NettyServer = null
+
+  private def initServer() = {
+if (enableDecompression) {
+  val channelFactory = new NioServerSocketChannelFactory
+(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+  val channelPipelieFactory = new CompressionChannelPipelineFactory()
+  
+  new NettyServer(
+responder, 
+new InetSocketAddress(host, port),
+channelFactory, 
+channelPipelieFactory, 
+null)
+} else {
+  new NettyServer(responder, new InetSocketAddress(host, port))
+}
+  }
 
   def onStart() {
-server.start()
+synchronized {
+  if (server == null) {
+server = initServer()
+server.start()
+  } else {
+logWarning("Flume receiver being asked to start more then once 
with out close")
+  }
+}
 logInfo("Flume receiver started")
   }
 
   def onStop() {
-server.close()
+synchronized {
+  if (server != null) {
+server.close()
+server = null
+  }
+}
 logInfo("Flume receiver stopped")
   }
 
   override def preferredLocation = Some(host)
 }
+
+private[streaming]
+class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
+
+  def getPipeline() = {
--- End diff --

Just a line of comment saying what pipeline does this return. For Flume 
noob's like me ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-20 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r14040366
  
--- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 ---
@@ -134,22 +144,64 @@ private[streaming]
 class FlumeReceiver(
 host: String,
 port: Int,
-storageLevel: StorageLevel
+storageLevel: StorageLevel,
+enableDecompression: Boolean
   ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
 
   lazy val responder = new SpecificResponder(
 classOf[AvroSourceProtocol], new FlumeEventServer(this))
-  lazy val server = new NettyServer(responder, new InetSocketAddress(host, 
port))
+  var server: NettyServer = null
+
+  private def initServer() = {
+if (enableDecompression) {
+  val channelFactory = new NioServerSocketChannelFactory
+(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+  val channelPipelieFactory = new CompressionChannelPipelineFactory()
+  
+  new NettyServer(
+responder, 
+new InetSocketAddress(host, port),
+channelFactory, 
+channelPipelieFactory, 
+null)
+} else {
+  new NettyServer(responder, new InetSocketAddress(host, port))
+}
+  }
 
   def onStart() {
-server.start()
+synchronized {
+  if (server == null) {
+server = initServer()
+server.start()
+  } else {
+logWarning("Flume receiver being asked to start more then once 
with out close")
+  }
+}
 logInfo("Flume receiver started")
   }
 
   def onStop() {
-server.close()
+synchronized {
+  if (server != null) {
+server.close()
+server = null
+  }
+}
 logInfo("Flume receiver stopped")
   }
 
   override def preferredLocation = Some(host)
 }
+
+private[streaming]
--- End diff --

Can you add comments to this class, explaining what this class does and why 
it is necessary?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-20 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r14040312
  
--- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 ---
@@ -134,22 +144,64 @@ private[streaming]
 class FlumeReceiver(
 host: String,
 port: Int,
-storageLevel: StorageLevel
+storageLevel: StorageLevel,
+enableDecompression: Boolean
   ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
 
   lazy val responder = new SpecificResponder(
 classOf[AvroSourceProtocol], new FlumeEventServer(this))
-  lazy val server = new NettyServer(responder, new InetSocketAddress(host, 
port))
+  var server: NettyServer = null
+
+  private def initServer() = {
+if (enableDecompression) {
+  val channelFactory = new NioServerSocketChannelFactory
+(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+  val channelPipelieFactory = new CompressionChannelPipelineFactory()
+  
+  new NettyServer(
+responder, 
+new InetSocketAddress(host, port),
+channelFactory, 
+channelPipelieFactory, 
+null)
+} else {
+  new NettyServer(responder, new InetSocketAddress(host, port))
+}
+  }
 
   def onStart() {
-server.start()
+synchronized {
+  if (server == null) {
+server = initServer()
+server.start()
+  } else {
+logWarning("Flume receiver being asked to start more then once 
with out close")
+  }
+}
 logInfo("Flume receiver started")
   }
 
   def onStop() {
-server.close()
+synchronized {
+  if (server != null) {
+server.close()
+server = null
+  }
+}
 logInfo("Flume receiver stopped")
   }
 
   override def preferredLocation = Some(host)
 }
+
+private[streaming]
+class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
+
+  def getPipeline() = {
+  val pipeline = Channels.pipeline()
--- End diff --

Formatting issue. 2 space indents required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-20 Thread tmalaska
Github user tmalaska commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-46724152
  
Let me know if there is anything I can do to help this go through.

Thanks tdas


On Fri, Jun 20, 2014 at 4:38 PM, Tathagata Das 
wrote:

> Jenkins, test this again.
>
> —
> Reply to this email directly or view it on GitHub
> .
>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-20 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-46724002
  
Jenkins, test this again. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-17 Thread tmalaska
Github user tmalaska commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-46372202
  
Hey tdas,

I was going to do 1642 tonight, but I noticed these changes are not in the 
code yet.  What should I do?

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-04 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-45166775
  
Yeah, starting to look at all pending PRs now.


On Wed, Jun 4, 2014 at 4:20 PM, Patrick Wendell 
wrote:

> @tdas  this seems pretty useful - could you take
> a look?
>
> —
> Reply to this email directly or view it on GitHub
> .
>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-06-04 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-45164568
  
@tdas this seems pretty useful - could you take a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-05-05 Thread tmalaska
Github user tmalaska commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-42189283
  
LOL tdas, how it going.  Just pinging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-05-01 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-41973712
  
Got side tracked, will take a look asap!
On May 1, 2014 12:52 PM, "Ted Malaska"  wrote:

> Hey tdas,
>
> How is this Jira looking. Is there anything I need to do to get it passed?
>
> —
> Reply to this email directly or view it on 
GitHub
> .
>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-05-01 Thread tmalaska
Github user tmalaska commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-41949374
  
Hey tdas,

How is this Jira looking.  Is there anything I need to do to get it passed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-29 Thread tmalaska
Github user tmalaska commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-41732028
  
I already updated the code and tested it.  Feel free to commit unless you 
see anything wrong.

If you commit it in the next couple hours.  I can start on SPARK-1642 
tonight or tomorrow morning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-29 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-41717224
  
aah, right, makes sense. Please go ahead with it, and test it as well. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-29 Thread tmalaska
Github user tmalaska commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-41663386
  
Let me know if the changes are ok.  The only difference from what you told 
me to do was I made a check to prevent a double start.  Let me know if you want 
me to take it out.  If so I can make the change very fast.

  if (server == null) {
server = initServer()
server.start()
  } else {
logWarning("Flume receiver being asked to start more then once with 
out close")
  }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-28 Thread tmalaska
Github user tmalaska commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-41631586
  
Will do.  I will start tomorrow.  Shouldn't take long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-28 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-41611069
  
Hey @tmalaska, I pondered about the code a bit more, especially about the 
lazy vals. The lazy val in this case is probably not a good idea. The receivers 
are now (after #300) are designed to be restartable multiple times. So 
onStart() + onStop() could be called multiple times if the receiver decides to 
restart itself (to handle exceptions). In which case, start() will be called on 
the netty server after it has been closed. I am not sure that is possible. So 
its best to create a new NettyServer every time a onStart() is called, rather 
than lazy initialize and use the netty server. 

So its probably best to do something like this.
```
FlumeReceiver  {
   var server: NettyServer = null

   def onStart() {
   synchronized { 
   server = initServer()
   server.start()
   }
   }

   def onStop() {
  synchronized { 
 if (server != null) {
server.stop()
 }
  }
   }
...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-28 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-41608948
  
Jenkins, this okay to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-28 Thread tmalaska
Github user tmalaska commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r12044730
  
--- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala 
---
@@ -66,6 +84,23 @@ object FlumeUtils {
   port: Int,
   storageLevel: StorageLevel
 ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-createStream(jssc.ssc, hostname, port, storageLevel)
+createStream(jssc.ssc, hostname, port, storageLevel, false)
+  }
+
+  /**
+   * Creates a input stream from a Flume source.
+   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
+   * @param port Port of the slave machine to which the flume data 
will be sent
+   * @param storageLevel  Storage level to use for storing the received 
objects
+   * @param enableCompression  Should Netty Server decode input stream 
from client  
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-28 Thread tmalaska
Github user tmalaska commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r12044643
  
--- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 ---
@@ -153,3 +181,15 @@ class FlumeReceiver(
 
   override def preferredLocation = Some(host)
 }
+
+private[streaming]
+class CompressionChannelPipelineFactory() extends ChannelPipelineFactory {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-28 Thread tmalaska
Github user tmalaska commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r12044638
  
--- Diff: 
external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
 ---
@@ -85,4 +108,14 @@ class FlumeStreamSuite extends TestSuiteBase {
   assert(outputBuffer(i).head.event.getHeaders.get("test") === 
"header")
 }
   }
+
+  class CompressionChannelFactory(compressionLevel: Int) extends
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-28 Thread tmalaska
Github user tmalaska commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-41546071
  
OK I have reviewed the commits and I will be making changes this morning.  
Thank tdas.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r12035701
  
--- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala 
---
@@ -36,7 +36,25 @@ object FlumeUtils {
   port: Int,
   storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
 ): ReceiverInputDStream[SparkFlumeEvent] = {
-val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, 
hostname, port, storageLevel)
+createStream(ssc, hostname, port, storageLevel, false)
+  }
+
+  /**
+   * Create a input stream from a Flume source.
+   * @param ssc  StreamingContext object
+   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
+   * @param port Port of the slave machine to which the flume data 
will be sent
+   * @param storageLevel  Storage level to use for storing the received 
objects
+   * @param enableCompression  Should Netty Server decode input stream 
from client  
--- End diff --

Also, shouldnt this parameter be `enableDecompression` as from the Spark 
Streaming's point of view it will be decompression the data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-27 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-41525190
  
Except a few nits, it looks good to me. However, since its so late in the 
process of Spark 1.0, I am little extra afraid of breaking something. If 
possible, can you run this one a cluster with real data transfer from producer 
to see if this works? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r12035643
  
--- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala 
---
@@ -66,6 +84,23 @@ object FlumeUtils {
   port: Int,
   storageLevel: StorageLevel
 ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-createStream(jssc.ssc, hostname, port, storageLevel)
+createStream(jssc.ssc, hostname, port, storageLevel, false)
+  }
+
+  /**
+   * Creates a input stream from a Flume source.
+   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
+   * @param port Port of the slave machine to which the flume data 
will be sent
+   * @param storageLevel  Storage level to use for storing the received 
objects
+   * @param enableCompression  Should Netty Server decode input stream 
from client  
--- End diff --

Same comment about enableCompression as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r12035652
  
--- Diff: 
external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
 ---
@@ -85,4 +108,14 @@ class FlumeStreamSuite extends TestSuiteBase {
   assert(outputBuffer(i).head.event.getHeaders.get("test") === 
"header")
 }
   }
+
+  class CompressionChannelFactory(compressionLevel: Int) extends
--- End diff --

Nit: No need to wrap the line here. Usually, it is either
```
class X extends Y
```
or
```
class X 
  extends Y
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r12035635
  
--- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala 
---
@@ -36,7 +36,25 @@ object FlumeUtils {
   port: Int,
   storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
 ): ReceiverInputDStream[SparkFlumeEvent] = {
-val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, 
hostname, port, storageLevel)
+createStream(ssc, hostname, port, storageLevel, false)
+  }
+
+  /**
+   * Create a input stream from a Flume source.
+   * @param ssc  StreamingContext object
+   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
+   * @param port Port of the slave machine to which the flume data 
will be sent
+   * @param storageLevel  Storage level to use for storing the received 
objects
+   * @param enableCompression  Should Netty Server decode input stream 
from client  
--- End diff --

I am a little confused. What does "should netty server decode input stream" 
have to do with "compression" ? Maybe you wanted to say "should netty server 
decompress input stream" ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/566#discussion_r12035631
  
--- Diff: 
external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 ---
@@ -153,3 +181,15 @@ class FlumeReceiver(
 
   override def preferredLocation = Some(host)
 }
+
+private[streaming]
+class CompressionChannelPipelineFactory() extends ChannelPipelineFactory {
--- End diff --

No need for `()` when no parameters are present.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-27 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-41524968
  
Jenkins, test this please.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/566#issuecomment-41468545
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1478: Upgrade FlumeInputDStream's FlumeR...

2014-04-26 Thread tmalaska
GitHub user tmalaska opened a pull request:

https://github.com/apache/spark/pull/566

SPARK-1478: Upgrade FlumeInputDStream's FlumeReceiver to support FLUME-1915



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tmalaska/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/566.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #566


commit 6a390690a19d4fe7d1c3c9029de66b94eb15be45
Author: tmalaska 
Date:   2014-04-26T13:17:02Z

Finished Second draft




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---