[jira] [Commented] (BEAM-7438) Distribution and Gauge metrics are not being exported to Flink dashboard neither Prometheus IO

2020-05-12 Thread Akshay Iyangar (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17105192#comment-17105192
 ] 

Akshay Iyangar commented on BEAM-7438:
--

[~mxm] - Facing the same issue will have a PR out for this soon.

> Distribution and Gauge metrics are not being exported to Flink dashboard 
> neither Prometheus IO
> --
>
> Key: BEAM-7438
> URL: https://issues.apache.org/jira/browse/BEAM-7438
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.12.0, 2.13.0
>Reporter: Ricardo Bordon
>Assignee: Akshay Iyangar
>Priority: Major
> Attachments: image-2019-05-29-11-24-36-911.png, 
> image-2019-05-29-11-26-49-685.png
>
>
> Distributions and gauge metrics are not visible at Flink dashboard neither 
> Prometheus IO.
> I was able to debug the runner code and see that these metrics are being 
> update over *FlinkMetricContainer#updateDistributions()* and 
> *FlinkMetricContainer#updateGauges()* (meaning they are treated as "attempted 
> metrics") but these are not visible when looking them over the Flink 
> Dashboard or Prometheus. In the other hand, *Counter* metrics work as 
> expected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-7438) Distribution and Gauge metrics are not being exported to Flink dashboard neither Prometheus IO

2020-05-12 Thread Akshay Iyangar (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akshay Iyangar reassigned BEAM-7438:


Assignee: Akshay Iyangar

> Distribution and Gauge metrics are not being exported to Flink dashboard 
> neither Prometheus IO
> --
>
> Key: BEAM-7438
> URL: https://issues.apache.org/jira/browse/BEAM-7438
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.12.0, 2.13.0
>Reporter: Ricardo Bordon
>Assignee: Akshay Iyangar
>Priority: Major
> Attachments: image-2019-05-29-11-24-36-911.png, 
> image-2019-05-29-11-26-49-685.png
>
>
> Distributions and gauge metrics are not visible at Flink dashboard neither 
> Prometheus IO.
> I was able to debug the runner code and see that these metrics are being 
> update over *FlinkMetricContainer#updateDistributions()* and 
> *FlinkMetricContainer#updateGauges()* (meaning they are treated as "attempted 
> metrics") but these are not visible when looking them over the Flink 
> Dashboard or Prometheus. In the other hand, *Counter* metrics work as 
> expected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9742) Add ability to pass FluentBackoff to JdbcIo.Write

2020-04-10 Thread Akshay Iyangar (Jira)
Akshay Iyangar created BEAM-9742:


 Summary: Add ability to pass FluentBackoff to JdbcIo.Write
 Key: BEAM-9742
 URL: https://issues.apache.org/jira/browse/BEAM-9742
 Project: Beam
  Issue Type: Improvement
  Components: io-java-jdbc
Reporter: Akshay Iyangar


Currently, the FluentBackoff is hardcoded with `maxRetries` and 
`initialBackoff` .
It would be helpful if the client were able to pass these values.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9742) Add ability to pass FluentBackoff to JdbcIo.Write

2020-04-10 Thread Akshay Iyangar (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akshay Iyangar updated BEAM-9742:
-
Status: Open  (was: Triage Needed)

> Add ability to pass FluentBackoff to JdbcIo.Write
> -
>
> Key: BEAM-9742
> URL: https://issues.apache.org/jira/browse/BEAM-9742
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-jdbc
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Minor
>
> Currently, the FluentBackoff is hardcoded with `maxRetries` and 
> `initialBackoff` .
> It would be helpful if the client were able to pass these values.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-9742) Add ability to pass FluentBackoff to JdbcIo.Write

2020-04-10 Thread Akshay Iyangar (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akshay Iyangar reassigned BEAM-9742:


Assignee: Akshay Iyangar

> Add ability to pass FluentBackoff to JdbcIo.Write
> -
>
> Key: BEAM-9742
> URL: https://issues.apache.org/jira/browse/BEAM-9742
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-jdbc
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Minor
>
> Currently, the FluentBackoff is hardcoded with `maxRetries` and 
> `initialBackoff` .
> It would be helpful if the client were able to pass these values.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9720) Add custom AWS Http Client Configuration capability for AWS client 1.0/2.0

2020-04-07 Thread Akshay Iyangar (Jira)
Akshay Iyangar created BEAM-9720:


 Summary: Add custom AWS Http Client Configuration capability for 
AWS client 1.0/2.0
 Key: BEAM-9720
 URL: https://issues.apache.org/jira/browse/BEAM-9720
 Project: Beam
  Issue Type: Improvement
  Components: io-java-aws
Reporter: Akshay Iyangar
Assignee: Akshay Iyangar


Currently, there is no way to set custom client configuration abilities to AWS 
client service.

Enable a way to pass these custom client configuration options as pipeline 
options.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-8212) StatefulParDoFn creates GC timers for every record

2020-03-11 Thread Akshay Iyangar (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akshay Iyangar reassigned BEAM-8212:


Assignee: Akshay Iyangar

> StatefulParDoFn creates GC timers for every record 
> ---
>
> Key: BEAM-8212
> URL: https://issues.apache.org/jira/browse/BEAM-8212
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Akshay Iyangar
>Assignee: Akshay Iyangar
>Priority: Major
>
> Hi 
> So currently the StatefulParDoFn create timers for all the records.
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L211]
> This becomes a problem if you are using GlobalWindows for streaming where 
> these timers get created and never get closed since the window will never 
> close.
> This is a problem especially if your memory bound in rocksDB where these 
> timers take up potential space and sloe the pipelines considerably.
> Was wondering that if the pipeline runs in global windows we should avoid 
> adding timers to it at all?
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-8212) StatefulParDoFn creates GC timers for every record

2020-03-11 Thread Akshay Iyangar (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akshay Iyangar updated BEAM-8212:
-
Status: Open  (was: Triage Needed)

> StatefulParDoFn creates GC timers for every record 
> ---
>
> Key: BEAM-8212
> URL: https://issues.apache.org/jira/browse/BEAM-8212
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Akshay Iyangar
>Priority: Major
>
> Hi 
> So currently the StatefulParDoFn create timers for all the records.
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L211]
> This becomes a problem if you are using GlobalWindows for streaming where 
> these timers get created and never get closed since the window will never 
> close.
> This is a problem especially if your memory bound in rocksDB where these 
> timers take up potential space and sloe the pipelines considerably.
> Was wondering that if the pipeline runs in global windows we should avoid 
> adding timers to it at all?
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-8212) StatefulParDoFn creates GC timers for every record

2019-09-26 Thread Akshay Iyangar (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16937228#comment-16937228
 ] 

Akshay Iyangar edited comment on BEAM-8212 at 9/26/19 11:19 PM:


 
{code:java}
public class TestDecodeTimer {
  @Test
  public void gctimerValue() throws IOException, ClassNotFoundException {

StateNamespace stateNamespace = 
StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);

String GC_TIMER_ID = "__StatefulParDoGcTimerId";
//timerInternals.setTimer(
//StateNamespaces.window(windowCoder, window), GC_TIMER_ID, 
gcTime, TimeDomain.EVENT_TIME);

ByteArrayOutputStream outStream = new ByteArrayOutputStream();
StringUtf8Coder.of().encode(GC_TIMER_ID, outStream);
StringUtf8Coder.of().encode(stateNamespace.stringKey(), outStream);

System.out.println("The output stream is :"+ outStream.toString()); // 
__StatefulParDoGcTimerId//
//We need to find what the hex value representation of this is
String encode = BaseEncoding.base16().encode(outStream.toByteArray());
System.out.println("The encoded string is " + encode); 
//185F5F537461746566756C506172446F476354696D65724964022F2F
// We need everything after this as that is the gctimer and check what the 
value is for it also remove the eventime.

ByteArrayOutputStream outStream1 = new ByteArrayOutputStream();
StringUtf8Coder.of().encode(TimeDomain.EVENT_TIME.toString(), outStream1);
String encode1 = BaseEncoding.base16().encode(outStream1.toByteArray());
System.out.println("The encoded1 string is " + encode1); 
//0A4556454E545F54494D45
System.out.println("Total Length of the encode key: "+ outStream.size());

//Example key
String decode = 
"008020C49BA0BCF7F901006A6176612E6E696F2E4865617042797465427565F20100010C0107313831303639000C0100185F5F537461746566756C506172446F476354696D65724964022F2F8020C49BA0BCF7F80A4556454E545F54494D45";

//So the timer is whatever is between these two 
185F5F537461746566756C506172446F476354696D65724964022F2F and 
0A4556454E545F54494D45 viz 8020C49BA0BCF7F8
Instant timeDecode = InstantCoder.of().decode(new 
ByteArrayInputStream(BaseEncoding.base16().decode(
"8020C49BA0BCF7F8")));

System.out.println("GC timer for Global Window is" +timeDecode); 
//294247-01-10T04:00:54.775Z 
//This is nothing but +infinity and thus these timers would never be 
cleaned as the window never closes.

//just cross verify
System.out.println("MAX value" + BoundedWindow.TIMESTAMP_MAX_VALUE);
System.out.println("MAX: "+GlobalWindow.TIMESTAMP_MAX_VALUE);

  }
}
{code}
So I just wrote a test to verify what the values are that are being generated 
for each of the events. just took one key from rocksdb to analyze and the timer 
is +Infinity or GlobalWindow.TIMESTAMP_MAX_VALUE which makes sense as it's a 
global window.

 

Also, I didn't see any keys associated with timers in the StatefulParDoFn .. 
{code:java}
rocksdb_ldb --db=db --column_family=_timer_state/event_beam-timer scan 
--max_keys=100 --key_hex
{code}
returned me zero keys. 

 

I ran a big pipeline to see the effect of having it disabled.

so at 1-hour mark with Global Window and rocksdb as the state backend,

the pipeline had consumed 432 million records with a memory usage of the node 
at roughly 50%. The node is 32GB EKS node where I gave 15GB to the JVM.

the same pipeline took 1 hr 30 mins to read 432 million records with the total 
node memory usage at 62%.

So I think it is fair to assume that for global windows the timers can affect 
the pipeline performance.

[~mxm] and [~NathanHowell] ^^

 

 

 

 

 


was (Author: aiyangar):
 
{code:java}
public class TestDecodeTimer {
  @Test
  public void gctimerValue() throws IOException, ClassNotFoundException {

StateNamespace stateNamespace = 
StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);

String GC_TIMER_ID = "__StatefulParDoGcTimerId";
//timerInternals.setTimer(
//StateNamespaces.window(windowCoder, window), GC_TIMER_ID, 
gcTime, TimeDomain.EVENT_TIME);

ByteArrayOutputStream outStream = new ByteArrayOutputStream();
StringUtf8Coder.of().encode(GC_TIMER_ID, outStream);
StringUtf8Coder.of().encode(stateNamespace.stringKey(), outStream);

System.out.println("The output stream is :"+ outStream.toString()); // 
__StatefulParDoGcTimerId//
//We need to find what the hex value representation of this is
String encode = BaseEncoding.base16().encode(outStream.toByteArray());
System.out.println("The encoded string is " + encode); 
//185F5F537461746566756C506172446F476354696D65724964022F2F
// We need everything after this as that is the gctimer and check what the 
value is for it also remove the eventime.

ByteArrayOutputStream outStream1 = new ByteArrayOutputStream();

[jira] [Commented] (BEAM-8303) Filesystems not properly registered using FileIO.write()

2019-09-25 Thread Akshay Iyangar (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16937955#comment-16937955
 ] 

Akshay Iyangar commented on BEAM-8303:
--

Can you check what is the security provider that is being used by your pipeline 
? We faced the issue and found that it was using SUN provider.

Changing it to conscript or bouncy castle will help you resolve the issue.

> Filesystems not properly registered using FileIO.write()
> 
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.15.0
>Reporter: Preston Koprivica
>Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis 
> (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions 
> with all the relevant AWS options, so the filesystem registry **should** be 
> properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write write = FileIO.write()
>     .via(ParquetIO.sink(schema))
>     .to(options.getOutputDir()). // will be something like: 
> s3:///
>     .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), 
> write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to 
> reliably reproduce the issue using JSON formatted records and TextIO.sink(), 
> as well.  Moreover, AvroIO is affected if withWindowedWrites() option is 
> added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled 
> up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at 
> 

[jira] [Commented] (BEAM-8212) StatefulParDoFn creates GC timers for every record

2019-09-24 Thread Akshay Iyangar (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16937228#comment-16937228
 ] 

Akshay Iyangar commented on BEAM-8212:
--

 
{code:java}
public class TestDecodeTimer {
  @Test
  public void gctimerValue() throws IOException, ClassNotFoundException {

StateNamespace stateNamespace = 
StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);

String GC_TIMER_ID = "__StatefulParDoGcTimerId";
//timerInternals.setTimer(
//StateNamespaces.window(windowCoder, window), GC_TIMER_ID, 
gcTime, TimeDomain.EVENT_TIME);

ByteArrayOutputStream outStream = new ByteArrayOutputStream();
StringUtf8Coder.of().encode(GC_TIMER_ID, outStream);
StringUtf8Coder.of().encode(stateNamespace.stringKey(), outStream);

System.out.println("The output stream is :"+ outStream.toString()); // 
__StatefulParDoGcTimerId//
//We need to find what the hex value representation of this is
String encode = BaseEncoding.base16().encode(outStream.toByteArray());
System.out.println("The encoded string is " + encode); 
//185F5F537461746566756C506172446F476354696D65724964022F2F
// We need everything after this as that is the gctimer and check what the 
value is for it also remove the eventime.

ByteArrayOutputStream outStream1 = new ByteArrayOutputStream();
StringUtf8Coder.of().encode(TimeDomain.EVENT_TIME.toString(), outStream1);
String encode1 = BaseEncoding.base16().encode(outStream1.toByteArray());
System.out.println("The encoded1 string is " + encode1); 
//0A4556454E545F54494D45
System.out.println("Total Length of the encode key: "+ outStream.size());

//Example key
String decode = 
"008020C49BA0BCF7F901006A6176612E6E696F2E4865617042797465427565F20100010C0107313831303639000C0100185F5F537461746566756C506172446F476354696D65724964022F2F8020C49BA0BCF7F80A4556454E545F54494D45";

//So the timer is whatever is between these two 
185F5F537461746566756C506172446F476354696D65724964022F2F and 
0A4556454E545F54494D45 viz 8020C49BA0BCF7F8
Instant timeDecode = InstantCoder.of().decode(new 
ByteArrayInputStream(BaseEncoding.base16().decode(
"8020C49BA0BCF7F8")));

System.out.println("GC timer for Global Window is" +timeDecode); 
//294247-01-10T04:00:54.775Z 
//This is nothing but +infinity and thus these timers would never be 
cleaned as the window never closes.

//just cross verify
System.out.println("MAX value" + BoundedWindow.TIMESTAMP_MAX_VALUE);
System.out.println("MAX: "+GlobalWindow.TIMESTAMP_MAX_VALUE);

  }
}
{code}
So I just wrote a test to verify what the values are that are being generated 
for each of the events. just took one key from rocksdb to analyze and the timer 
is +Infinity or GlobalWindow.TIMESTAMP_MAX_VALUE which makes sense as it's a 
global window.

 

I also went ahead and disabled the timers for global windows to do some 
benchmarking and found that now rocksdb doesn't generate any state for 
WindowDoFnOperator something that was previously generated as below.
{code:java}
/rocksdb/job__op_WindowDoFnOperator_e2c1f521beded61187c1d16f3c146358__3_3__uuid_4e0e102b-ffcd-4111-80f7-b9a8f318d04a/db
{code}
 

Also, I didn't see any keys associated with timers in the StatefulParDoFn .. 
{code:java}
rocksdb_ldb --db=db --column_family=_timer_state/event_beam-timer scan 
--max_keys=100 --key_hex
{code}
returned me zero keys. 

 

I'm running the pipeline to get the exact benchmarks and will keep you updated 
but one thing right of the bat is that we see fewer state operators that mean 
rocksdb will have more memory to play with as it has operator * parallelism # 
of states less as compared to the previous run. w.rt to the speed and 
throughput of the pipeline will update shortly.

 

 

 

 

> StatefulParDoFn creates GC timers for every record 
> ---
>
> Key: BEAM-8212
> URL: https://issues.apache.org/jira/browse/BEAM-8212
> Project: Beam
>  Issue Type: Bug
>  Components: beam-community
>Reporter: Akshay Iyangar
>Assignee: Aizhamal Nurmamat kyzy
>Priority: Major
>
> Hi 
> So currently the StatefulParDoFn create timers for all the records.
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L211]
> This becomes a problem if you are using GlobalWindows for streaming where 
> these timers get created and never get closed since the window will never 
> close.
> This is a problem especially if your memory bound in rocksDB where these 
> timers take up potential space and sloe the pipelines considerably.
> Was wondering that if the pipeline runs in global windows we should avoid 
> adding timers to it at all?
>  
>  
>  



--
This 

[jira] [Commented] (BEAM-8212) StatefulParDoFn creates GC timers for every record

2019-09-20 Thread Akshay Iyangar (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16934618#comment-16934618
 ] 

Akshay Iyangar commented on BEAM-8212:
--

[~mxm] - Hey can you help me with this ?? 

> StatefulParDoFn creates GC timers for every record 
> ---
>
> Key: BEAM-8212
> URL: https://issues.apache.org/jira/browse/BEAM-8212
> Project: Beam
>  Issue Type: Bug
>  Components: beam-community
>Reporter: Akshay Iyangar
>Assignee: Aizhamal Nurmamat kyzy
>Priority: Major
>
> Hi 
> So currently the StatefulParDoFn create timers for all the records.
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L211]
> This becomes a problem if you are using GlobalWindows for streaming where 
> these timers get created and never get closed since the window will never 
> close.
> This is a problem especially if your memory bound in rocksDB where these 
> timers take up potential space and sloe the pipelines considerably.
> Was wondering that if the pipeline runs in global windows we should avoid 
> adding timers to it at all?
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8212) StatefulParDoFn creates GC timers for every record

2019-09-11 Thread Akshay Iyangar (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16927893#comment-16927893
 ] 

Akshay Iyangar commented on BEAM-8212:
--

[~NathanHowell] - Adding nate to add something that i may have missed in the 
description.
[~randomsamples] 

> StatefulParDoFn creates GC timers for every record 
> ---
>
> Key: BEAM-8212
> URL: https://issues.apache.org/jira/browse/BEAM-8212
> Project: Beam
>  Issue Type: Bug
>  Components: beam-community
>Reporter: Akshay Iyangar
>Assignee: Aizhamal Nurmamat kyzy
>Priority: Major
>
> Hi 
> So currently the StatefulParDoFn create timers for all the records.
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L211]
> This becomes a problem if you are using GlobalWindows for streaming where 
> these timers get created and never get closed since the window will never 
> close.
> This is a problem especially if your memory bound in rocksDB where these 
> timers take up potential space and sloe the pipelines considerably.
> Was wondering that if the pipeline runs in global windows we should avoid 
> adding timers to it at all?
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (BEAM-8212) StatefulParDoFn creates GC timers for every record

2019-09-11 Thread Akshay Iyangar (Jira)
Akshay Iyangar created BEAM-8212:


 Summary: StatefulParDoFn creates GC timers for every record 
 Key: BEAM-8212
 URL: https://issues.apache.org/jira/browse/BEAM-8212
 Project: Beam
  Issue Type: Bug
  Components: beam-community
Reporter: Akshay Iyangar
Assignee: Aizhamal Nurmamat kyzy


Hi 

So currently the StatefulParDoFn create timers for all the records.
[https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L211]

This becomes a problem if you are using GlobalWindows for streaming where these 
timers get created and never get closed since the window will never close.

This is a problem especially if your memory bound in rocksDB where these timers 
take up potential space and sloe the pipelines considerably.

Was wondering that if the pipeline runs in global windows we should avoid 
adding timers to it at all?

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-29 Thread Akshay Iyangar (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851183#comment-16851183
 ] 

Akshay Iyangar edited comment on BEAM-7442 at 5/29/19 6:47 PM:
---

Yes .. I have taken those comments into consideration and should have the PR 
out . shortly.. like i was not able to assign the ticket to myself.. will 
appreciate if someone can help me with that.


was (Author: aiyangar):
Yes .. I have taken those comments into consideration and should have the PR 
out . shortly.. like i was not able to assign the ticket to myself.. will 
appreciate if someone can help me with that..

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-29 Thread Akshay Iyangar (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851183#comment-16851183
 ] 

Akshay Iyangar edited comment on BEAM-7442 at 5/29/19 6:47 PM:
---

Yes .. I have taken those comments into consideration and should have the PR 
out . shortly.. like i was not able to assign the ticket to myself.. will 
appreciate if someone can help me with that.
Cool !!! you have already added the changes.. sweet !!! didn't see that..


was (Author: aiyangar):
Yes .. I have taken those comments into consideration and should have the PR 
out . shortly.. like i was not able to assign the ticket to myself.. will 
appreciate if someone can help me with that.

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-7442) Bounded Reads for Flink Runner fails with OOM

2019-05-29 Thread Akshay Iyangar (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16851183#comment-16851183
 ] 

Akshay Iyangar commented on BEAM-7442:
--

Yes .. I have taken those comments into consideration and should have the PR 
out . shortly.. like i was not able to assign the ticket to myself.. will 
appreciate if someone can help me with that..

> Bounded Reads for Flink Runner fails with OOM
> -
>
> Key: BEAM-7442
> URL: https://issues.apache.org/jira/browse/BEAM-7442
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Akshay Iyangar
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When Flink runner is reading from a bounded source and if the total number of 
> files are huge and the count is more. FlinkRunner throws an OOM error. This 
> is happening because the current implementation doesn't read them 
> sequentially but simultaneously thus causing all of the files to be in memory 
> which quickly breaks the cluster.
> Solution : To wrap `UnboundedReadFromBoundedSource` class by a wrapper to see 
> that when the stream is a bounded source we make it read it sequentially 
> using a queue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)