[jira] [Comment Edited] (BEAM-3820) SolrIO: Allow changing batchSize for writes

2018-03-28 Thread Tim Robertson (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417017#comment-16417017
 ] 

Tim Robertson edited comment on BEAM-3820 at 3/28/18 9:57 AM:
--

Thanks [~echauchot] - understood and I would propose documenting it as such. 

I'll pick this up after the [BEAM-3848 pull 
request|https://github.com/apache/beam/pull/4905] completes as I believe it 
should be considered for inclusion (unless it is decided not to do it).  My own 
setup currently uses batch sizes of {{1000 x #SOLR_SERVERS}} which yielded 26% 
increase in total throughout on production, and {{100}} on dev cluster which 
improved stability.


was (Author: timrobertson100):
Thanks [~echauchot] - understod and I would propose documenting it as such. 

I'll pick this up after the [BEAM-3848 pull 
request|https://github.com/apache/beam/pull/4905] completes as I believe it 
should be considered for inclusion.  My own setup currently uses batch sizes of 
{{1000 x #SOLR_SERVERS}} which yielded 26% increase in total throughout on 
production, and {{100}} on dev cluster which improved stability. 

> SolrIO: Allow changing batchSize for writes
> ---
>
> Key: BEAM-3820
> URL: https://issues.apache.org/jira/browse/BEAM-3820
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-solr
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Tim Robertson
>Assignee: Tim Robertson
>Priority: Trivial
>
> The SolrIO hard codes the batchSize for writes at 1000.  It would be a good 
> addition to allow the user to set the batchSize explicitly (similar to the 
> ElasticsearchIO)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-3820) SolrIO: Allow changing batchSize for writes

2018-03-28 Thread Tim Robertson (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417017#comment-16417017
 ] 

Tim Robertson edited comment on BEAM-3820 at 3/28/18 9:54 AM:
--

Thanks [~echauchot] - understod and I would propose documenting it as such. 

I'll pick this up after the [BEAM-3848 pull 
request|https://github.com/apache/beam/pull/4905] completes as I believe it 
should be considered for inclusion.  My own setup currently uses batch sizes of 
{{1000 x #SOLR_SERVERS}} which yielded 26% increase in total throughout on 
production, and {{100}} on dev cluster which improved stability. 


was (Author: timrobertson100):
Thanks [~echauchot] - understod and I would propose documenting it as such. 

I'll pick this up after the [BEAM-3848 pull 
request|https://github.com/apache/beam/pull/4905] completes as I believe it 
should be considered for inclusion.  My own setup currently uses batch sizes of 
{{1000 x #SOLR_SERVERS}} which yielded 26% increase in total throughout on 
production, and {{100}} on dev for stability. 

> SolrIO: Allow changing batchSize for writes
> ---
>
> Key: BEAM-3820
> URL: https://issues.apache.org/jira/browse/BEAM-3820
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-solr
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Tim Robertson
>Assignee: Tim Robertson
>Priority: Trivial
>
> The SolrIO hard codes the batchSize for writes at 1000.  It would be a good 
> addition to allow the user to set the batchSize explicitly (similar to the 
> ElasticsearchIO)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-3820) SolrIO: Allow changing batchSize for writes

2018-03-15 Thread Tim Robertson (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400372#comment-16400372
 ] 

Tim Robertson edited comment on BEAM-3820 at 3/15/18 7:11 PM:
--

Thanks for engaging in this discussion [~jkff]. I'll try and provide context 
and justification here in the hope of changing your mind.

On this specific issue I am preparing pipelines to test ingestion speed of SOLR 
from Avro files using Beam on Spark. I am preparing this with configuration to 
run on different hardware - preproduction and production. The SOLR schema is 
approx 200 fields of text and I'm currently ingesting at 250M docs/hr on the 
preproduction cluster.

So far, the tuning I have performed:
h5. Spark

Obvious stuff, like number executors (JVMs), number tasks each runs 
concurrently, memory for each JVM.
h5. Parallelism

Because Beam does not allow a repartition of the underlying Spark RDD, the 
total parallelism of how the blocks are read from {{Avro}} can be tuned using 
e.g. {{--conf spark.default.parallelism=1000}}. Combined with the number of 
concurrent tasks you allow Spark to run controls the number of concurrent 
clients {{SOLR}} sees. Without this the Avro reader can present fewer tasks 
than you wish and failures can result in large amounts being retried 
(BEAM-3848).
h5. SOLR server tuning

Running with defaults can result in CPU resource starvation on underpowered 
machines so I've tuned the following:
 - Increasing {{solr.hdfs.nrtcachingdirectory.maxmergesizemb}} and 
{{solr.hdfs.nrtcachingdirectory.maxcachedmb}} to reduce File IO. Defaults can 
lead to very high load on the {{HDFS NameNode}}
 - Increasing {{ramBufferSizeMB}} to {{1024MB}} to further reduce File IO.
 - Reducing the {{maxIndexingThreads}} and relaxing the aggressiveness of the 
{{TieredMergePolicy}} to reduce CPU pressure on the server.
 - Heap, off-heap memory and the HDFS cache size

h5. The problem

>From my understanding I believe the {{SolrIO}} works as follows.
 - It accumulates a batch (hard coded to 1000) of {{SolrInputDocument}} which 
it then passes to the underlying {{SolrJ}} client which is an instance of a 
{{CloudSolrClient}}.
 - The {{CloudSolrClient}} will accept the batch, then negotiate with ZooKeeper 
to identify the current shard leader for each document based on the routing 
defined in the Solr collection (implicit routing in my case).
 - The batch of 1000 is then split into sub collections destined for the target 
shards. They are sent in parallel to the servers hosting each shard leader 
necessary
 - The leaders then writes to the SOLR TLog for the shard, forwards to a 
replica (if any are configured) before returning {{HTTP 200}} and confirming 
receipt of the batch
 - Once all confirmations are successfully received the {{CloudSolrClient}} 
returns to the {{SolrIO}}

If my understanding is correct it is important to note that the Beam batch of 
1000 docs is actually split up based on the configured number of SOLR shards, 
servers and based on the routing characteristics of the documents contained. 
The comments in the {{SolrIO}} suggest that the author might have assumed the 
batch would be a single http call to one server:
{quote}{{// 1000 for batch size is good enough in many cases,}}
 {{// ex: if document size is large, around 10KB, the request's size will be 
around 10MB}}
 {{// if document seize is small, around 1KB, the request's size will be around 
1MB}}
{quote}
I don't see how Beam could ever really optimise for this though. There are 
cases (like mine) where as the pipeline developer I know my routing strategy, 
network bandwidth and I know the target environments - I would run with 
different profiles for each environment to control the true batch sizes 
observed at the SOLR server.

 

 


was (Author: timrobertson100):
Thanks for engaging in this discussion [~jkff]. I'll try and provide context 
and justification here in the hope of changing your mind.

On this specific issue I am preparing pipelines to test ingestion speed of SOLR 
from Avro files using Beam on Spark. I am preparing this with configuration to 
run on different hardware - preproduction and production. The SOLR schema is 
approx 200 fields of text and I'm currently ingesting at 250M docs/hr on the 
preproduction cluster.

So far, the tuning I have performed:
h5. Spark

Obvious stuff, like number executors (JVMs), number tasks each runs 
concurrently, memory for each JVM.
h5. Parallelism

Because Beam does not allow a repartition of the underlying Spark RDD, the 
total parallelism of how the blocks are read from {{Avro}} can be tuned using 
e.g. {{--conf spark.default.parallelism=1000}}. Combined with the number of 
concurrent tasks you allow Spark to run controls the number of concurrent 
clients {{SOLR}} sees. Without this the Avro reader can present fewer tasks 
than you wish and failures can result in large amounts being 

[jira] [Comment Edited] (BEAM-3820) SolrIO: Allow changing batchSize for writes

2018-03-15 Thread Tim Robertson (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400372#comment-16400372
 ] 

Tim Robertson edited comment on BEAM-3820 at 3/15/18 2:57 PM:
--

Thanks for engaging in this discussion [~jkff]. I'll try and provide context 
and justification here in the hope of changing your mind.

On this specific issue I am preparing pipelines to test ingestion speed of SOLR 
from Avro files using Beam on Spark. I am preparing this with configuration to 
run on different hardware - preproduction and production. The SOLR schema is 
approx 200 fields of text and I'm currently ingesting at 250M docs/hr on the 
preproduction cluster.

So far, the tuning I have performed:
h5. Spark

Obvious stuff, like number executors (JVMs), number tasks each runs 
concurrently, memory for each JVM.
h5. Parallelism

Because Beam does not allow a repartition of the underlying Spark RDD, the 
total parallelism of how the blocks are read from {{Avro}} can be tuned using 
e.g. {{--conf spark.default.parallelism=1000}}. Combined with the number of 
concurrent tasks you allow Spark to run controls the number of concurrent 
clients {{SOLR}} sees. Without this the Avro reader can present fewer tasks 
than you wish and failures can result in large amounts being retried 
(BEAM-3848).
h5. SOLR server tuning

Running with defaults can result in CPU resource starvation on underpowered 
machines so I've tuned the following:
 - Increasing {{solr.hdfs.nrtcachingdirectory.maxmergesizemb}} and 
{{solr.hdfs.nrtcachingdirectory.maxcachedmb}} to reduce File IO. Defaults can 
lead to very high load on the {{HDFS NameNode}}
 - Increasing {{ramBufferSizeMB}} to {{1024MB}} to further reduce File IO.
 - Reducing the {{maxIndexingThreads}} and relaxing the aggressiveness of the 
{{TieredMergePolicy}} to reduce CPU pressure on the server.
 - Heap, off-heap memory and the HDFS cache size

h5. The problem

>From my understanding I believe the {{SolrIO}} works as follows.
 - It accumulates a batch (hard coded to 1000) of {{SolrInputDocument}} which 
it then passes to the underlying {{SolrJ}} client which is an instance of a 
{{CloudSolrClient}}.
 - The {{CloudSolrClient}} will accept the batch, then negotiate with ZooKeeper 
to identify the current shard leader for each document based on the routing 
defined in the Solr collection (implicit routing in my case).
 - The batch of 1000 is then split into sub collections based on the current 
leader and will issue those smaller batches concurrently to each SOLR server 
hosting the related shard leader
 - The leader then writes to the SOLR TLog and forwards to a replica (if any 
are configured) before returning {{HTTP 200}} confirming receipt of the batch
 - Once all batches are successfully received the {{CloudSolrClient}} returns 
to the {{SolrIO}}

If my understanding is correct it is important to note that the Beam batch of 
1000 docs is actually split up based on the configured number of SOLR shards, 
servers and based on the routing characteristics of the documents contained. 
The comments in the {{SolrIO}} suggest that the author might have assumed the 
batch would be a single http call to one server:
{quote}{{// 1000 for batch size is good enough in many cases,}}
 {{// ex: if document size is large, around 10KB, the request's size will be 
around 10MB}}
 {{// if document seize is small, around 1KB, the request's size will be around 
1MB}}
{quote}
I don't see how Beam could ever really optimise for this though. There are 
cases (like mine) where as the pipeline developer I know my routing strategy, 
network bandwidth and I know the target environments - I would run with 
different profiles for each environment to control the true batch sizes 
observed at the SOLR server.

 

 


was (Author: timrobertson100):
Thanks for engaging in this discussion [~jkff]. I'll try and provide context 
and justification here in the hope of changing your mind.

On this specific issue I am preparing pipelines to test ingestion speed of SOLR 
from Avro files using Beam on Spark. I am preparing this with configuration to 
run on different hardware - preproduction and production. The SOLR schema is 
approx 200 fields of text and I'm currently ingesting at 250M docs/hr on the 
preproduction cluster.

So far, the tuning I have performed:
h5. Spark

Obvious stuff, like number executors (JVMs), number tasks each runs 
concurrently, memory for each JVM.
h5. Parallelism

Because Beam does not allow a repartition of the underlying Spark RDD, the 
total parallelism of how the blocks are read from {{Avro}} can be tuned using 
e.g. {{--conf spark.default.parallelism=1000}}. Combined with the number of 
concurrent tasks you allow Spark to run controls the number of concurrent 
clients {{SOLR}} sees. Without this the Avro reader can present fewer tasks 
than you wish and failures can result in large amounts being 

[jira] [Comment Edited] (BEAM-3820) SolrIO: Allow changing batchSize for writes

2018-03-15 Thread Tim Robertson (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400372#comment-16400372
 ] 

Tim Robertson edited comment on BEAM-3820 at 3/15/18 2:56 PM:
--

Thanks for engaging in this discussion [~jkff]. I'll try and provide context 
and justification here in the hope of changing your mind.

On this specific issue I am preparing pipelines to test ingestion speed of SOLR 
from Avro files using Beam on Spark. I am preparing this with configuration to 
run on different hardware - preproduction and production. The SOLR schema is 
approx 200 fields of text and I'm currently ingesting at 250M docs/hr on the 
preproduction cluster.

So far, the tuning I have performed:
h5. Spark

Obvious stuff, like number executors (JVMs), number tasks each runs 
concurrently, memory for each JVM.
h5. Parallelism

Because Beam does not allow a repartition of the underlying Spark RDD, the 
total parallelism of how the blocks are read from {{Avro}} can be tuned using 
e.g. {{--conf spark.default.parallelism=1000}}. Combined with the number of 
concurrent tasks you allow Spark to run controls the number of concurrent 
clients {{SOLR}} sees. Without this the Avro reader can present fewer tasks 
than you wish and failures can result in large amounts being retried 
(BEAM-3848).
h5. SOLR server tuning

Running with defaults can result in CPU resource starvation on underpowered 
machines so I've tuned the following:
 - Increasing {{solr.hdfs.nrtcachingdirectory.maxmergesizemb}} and 
{{solr.hdfs.nrtcachingdirectory.maxcachedmb}} to reduce File IO. Defaults can 
lead to very high load on the {{HDFS NameNode}}
 - Increasing {{ramBufferSizeMB}} to {{1024MB}} to further reduce File IO.
 - Reducing the {{maxIndexingThreads}} and relaxing the aggressiveness of the 
{{TieredMergePolicy}} to reduce CPU pressure on the server.
 - Heap, off-heap memory and the HDFS cache size

h5. The problem

>From my understanding I believe the {{SolrIO}} works as follows.
 - It accumulates a batch (hard coded to 1000) of {{SolrInputDocument}} which 
it then passes to the underlying {{SolrJ}} client which is an instance of a 
{{CloudSolrClient}}.
 - The {{CloudSolrClient}} will accept the batch, then negotiate with ZooKeeper 
to identify the current shard leader for each document based on the routing 
defined in the Solr collection (implicit routing in my case).
 - The batch of 1000 is then split into sub collections based on the current 
leader and will issue those smaller batches to the SOLR server hosting that 
shard leader
 - The leader then writes to the SOLR TLog and forwards to a replica (if any 
are configured) before returning {{HTTP 200}} confirming receipt of the batch
 - Once all batches are successfully received the {{CloudSolrClient}} returns 
to the {{SolrIO}}

If my understanding is correct it is important to note that the Beam batch of 
1000 docs is actually split up based on the configured number of SOLR shards, 
servers and based on the routing characteristics of the documents contained. 
The comments in the {{SolrIO}} suggest that the author might have assumed the 
batch would be a single http call to one server:
{quote}{{// 1000 for batch size is good enough in many cases,}}
 {{// ex: if document size is large, around 10KB, the request's size will be 
around 10MB}}
 {{// if document seize is small, around 1KB, the request's size will be around 
1MB}}
{quote}
I don't see how Beam could ever really optimise for this though. There are 
cases (like mine) where as the pipeline developer I know my routing strategy, 
network bandwidth and I know the target environments - I would run with 
different profiles for each environment to control the true batch sizes 
observed at the SOLR server.

 

 


was (Author: timrobertson100):
Thanks for engaging in this discussion [~jkff]. I'll try and provide context 
and justification here in the hope of changing your mind.

On this specific issue I am preparing pipelines to test ingestion speed of SOLR 
from Avro files using Beam on Spark. I am preparing this with configuration to 
run on different hardware - preproduction and production. The SOLR schema is 
approx 200 fields of text and I'm currently ingesting at 250M docs/hr on the 
preproduction cluster.

So far, the tuning I have performed:
h5. Spark

Obvious stuff, like number executors (JVMs), number tasks each runs 
concurrently, memory for each JVM.
h5. Parallelism

Because Beam does not allow a repartition of the underlying Spark RDD, the 
total parallelism of how the blocks are read from {{Avro}} can be tuned using 
e.g. {{--conf spark.default.parallelism=1000}}. Combined with the number of 
concurrent tasks you allow Spark to run controls the number of concurrent 
clients {{SOLR}} sees. Without this the Avro reader can present fewer tasks 
than you wish and failures can result in large amounts being retried 
(BEAM-3848).
h5. 

[jira] [Comment Edited] (BEAM-3820) SolrIO: Allow changing batchSize for writes

2018-03-15 Thread Tim Robertson (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400372#comment-16400372
 ] 

Tim Robertson edited comment on BEAM-3820 at 3/15/18 1:23 PM:
--

Thanks for engaging in this discussion [~jkff]. I'll try and provide context 
and justification here in the hope of changing your mind.

On this specific issue I am preparing pipelines to test ingestion speed of SOLR 
from Avro files using Beam on Spark. I am preparing this with configuration to 
run on different hardware - preproduction and production. The SOLR schema is 
approx 200 fields of text and I'm currently ingesting at 250M docs/hr on the 
preproduction cluster.

So far, the tuning I have performed:
h5. Spark

Obvious stuff, like number executors (JVMs), number tasks each runs 
concurrently, memory for each JVM.
h5. Parallelism

Because Beam does not allow a repartition of the underlying Spark RDD, the 
total parallelism of how the blocks are read from {{Avro}} can be tuned using 
e.g. {{--conf spark.default.parallelism=1000}}. Combined with the number of 
concurrent tasks you allow Spark to run controls the number of concurrent 
clients {{SOLR}} sees. Without this the Avro reader can present fewer tasks 
than you wish and failures can result in large amounts being retried 
(BEAM-3848).
h5. SOLR server tuning

Running with defaults can result in CPU resource starvation on underpowered 
machines so I've tuned the following:
 - Increasing {{solr.hdfs.nrtcachingdirectory.maxmergesizemb}} and 
{{solr.hdfs.nrtcachingdirectory.maxcachedmb}} to reduce File IO. Defaults can 
lead to very high load on the {{HDFS NameNode}}
 - Increasing {{ramBufferSizeMB}} to {{1024MB}} to further reduce File IO.
 - Reducing the {{maxIndexingThreads}} and relaxing the aggressiveness of the 
{{TieredMergePolicy}} to reduce CPU pressure on the server.

h5. The problem

>From my understanding I believe the {{SolrIO}} works as follows.
 - It accumulates a batch (hard coded to 1000) of {{SolrInputDocument}} which 
it then passes to the underlying {{SolrJ}} client which is an instance of a 
{{CloudSolrClient}}.
 - The {{CloudSolrClient}} will accept the batch, then negotiate with ZooKeeper 
to identify the current shard leader for each document based on the routing 
defined in the Solr collection (implicit routing in my case).
 - The batch of 1000 is then split into sub collections based on the current 
leader and will issue those smaller batches to the SOLR server hosting that 
shard leader
 - The leader then writes to the SOLR TLog and forwards to a replica (if any 
are configured) before returning {{HTTP 200}} confirming receipt of the batch
 - Once all batches are successfully received the {{CloudSolrClient}} returns 
to the {{SolrIO}}

If my understanding is correct it is important to note that the Beam batch of 
1000 docs is actually split up based on the configured number of SOLR shards, 
servers and based on the routing characteristics of the documents contained. 
The comments in the {{SolrIO}} suggest that the author might have assumed the 
batch would be a single http call to one server:
{quote}{{// 1000 for batch size is good enough in many cases,}}
 {{// ex: if document size is large, around 10KB, the request's size will be 
around 10MB}}
 {{// if document seize is small, around 1KB, the request's size will be around 
1MB}}
{quote}
I don't see how Beam could ever really optimise for this though. There are 
cases (like mine) where as the pipeline developer I know my routing strategy, 
network bandwidth and I know the target environments - I would run with 
different profiles for each environment to control the true batch sizes 
observed at the SOLR server.

 

 


was (Author: timrobertson100):
Thanks for engaging in this discussion [~jkff]. I'll try and provide context 
and justification here in the hope of changing your mind.

On this specific issue I am preparing pipelines to test ingestion speed of SOLR 
from Avro files using Beam on Spark. I am preparing this with configuration to 
run on different hardware - preproduction and production. The SOLR schema is 
approx 200 fields of text and I'm currently ingesting at 250M docs/hr on the 
preproduction cluster.

So far, the tuning I have performed:
h5. Spark

Obvious stuff, like number executors (JVMs), number tasks each runs 
concurrently, memory for each JVM.
h5. Parallelism

Because Beam does not allow an repartition of the underlying Spark RDD, the 
total parallelism of how the blocks are read from {{Avro}} can be tuned using 
e.g. {{--conf spark.default.parallelism=1000}}. Combined with the number of 
concurrent tasks you allow Spark to run controls the number of concurrent 
clients {{SOLR}} sees. Without this the Avro reader can present fewer tasks 
than you wishes, and failures can result in large amounts being retried 
(BEAM-3848).
h5. SOLR server tuning

Running defaults can