Re: Error During ElasticsearchIO read

2021-03-07 Thread Mohil Khare
Hello,
I guess Operation timed out was more of a networking issue (since curl was
also failing which I should have tried first). When I tried with different
elastic search cluster, I didn't get this error message.

However, I am still unable to read because of following:

{"error":
{"root_cause":[{"type":"illegal_argument_exception","reason":"Cannot parse
scroll id"}],"type":"illegal_argument_exception","reason":"Cannot parse
scroll
id","caused_by":{"type":"array_index_out_of_bounds_exception","reason":"arraycopy:
last source index 1668257 out of bounds for byte[3]"}},"status":400}

   1.
  1. at org.elasticsearch.client.RestClient.convertResponse (
  RestClient.java:283
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2021-03-07_16_00_02-4986317764329077743=org%2Felasticsearch.client%2FRestClient.java=283=prosimo-firstnetwork=1>
  )
  2. at org.elasticsearch.client.RestClient.performRequest (
  RestClient.java:261
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2021-03-07_16_00_02-4986317764329077743=org%2Felasticsearch.client%2FRestClient.java=261=prosimo-firstnetwork=1>
  )
  3. at org.elasticsearch.client.RestClient.performRequest (
  RestClient.java:235
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2021-03-07_16_00_02-4986317764329077743=org%2Felasticsearch.client%2FRestClient.java=235=prosimo-firstnetwork=1>
  )
   2.
  1. at org.apache.beam.sdk.io.elasticsearch.
  ElasticsearchIO$BoundedElasticsearchReader.close (
  ElasticsearchIO.java:918)
  2. at org.apache.beam.runners.dataflow.worker.
  WorkerCustomSources$BoundedReaderIterator.close (
  WorkerCustomSources.java:632)
  3. at org.apache.beam.runners.dataflow.worker.util.common.worker.
  NativeReader$NativeReaderIterator.abort (NativeReader.java:179)
  4. at org.apache.beam.runners.dataflow.worker.util.common.worker.
  ReadOperation$SynchronizedReaderIterator.abort (ReadOperation.java:371
  )
  5. at org.apache.beam.runners.dataflow.worker.util.common.worker.
  ReadOperation.abort (ReadOperation.java:256)
  6. at org.apache.beam.runners.dataflow.worker.util.common.worker.
  MapTaskExecutor.execute (MapTaskExecutor.java:91)


Any help would be greatly appreciated.

Thanks and Regards
Mohil

On Sun, Mar 7, 2021 at 2:58 PM Mohil Khare  wrote:

> Hello ElasticSearchIO and beam users/developers,
>
> I am on Beam 2.23.0 and elasticsearch 6.8
>
> I have been using elasticsearchIO.write() successfully.
> For the first time, I am trying to use elasticsearchIO.read because I have
> a use case where I want to read data from one elasticsearch cluster,
> modify data and then write it to another elasticsearch cluster.
>
> My read transform is very simple:
>
> *Pipeline p = input.getPipeline();*
>
> *return p*
>
> *.apply("Read_From_ES", ElasticsearchIO.read()*
>
> *.withQuery(query)*
>
> *.withBatchSize(50)*
>
> *.withScrollKeepalive("3m")*
>
> *.withConnectionConfiguration(*
>
> *
> ElasticsearchIO.ConnectionConfiguration.create(esConnectionParams.getElasticsearchEndpoints(),
> indexName, "_doc")*
>
> *.withConnectTimeout(24)*
>
> *.withSocketTimeout(24)*
>
> *
> .withUsername(esConnectionParams.getElasticsearchUsername()).withPassword(esConnectionParams.getElasticsearchPassword()))*
>
> *);*
>
> But I am unable to submit job successfully and getting following exception
> in my READ transform:
>
> WARNING: Size estimation of the source failed:
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource@7c974942
> java.net.ConnectException: Operation timed out
> at
> org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:823)
> at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248)
> at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
> at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getStats(ElasticsearchIO.java:797)
> at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:700)
> at
> org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:77)
> at
> org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51)
> at
> org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:38)
> at
> org.apache.beam.run

Operation Timed out while ElasticsearchIO read

2021-03-07 Thread Mohil Khare
Hello ElasticSearchIO and beam users/developers,

I am on Beam 2.23.0 and elasticsearch 6.8

I have been using elasticsearchIO.write() successfully.
For the first time, I am trying to use elasticsearchIO.read because I have
a use case where I want to read data from one elasticsearch cluster,
modify data and then write it to another elasticsearch cluster.

My read transform is very simple:

*Pipeline p = input.getPipeline();*

*return p*

*.apply("Read_From_ES", ElasticsearchIO.read()*

*.withQuery(query)*

*.withBatchSize(50)*

*.withScrollKeepalive("3m")*

*.withConnectionConfiguration(*

*
ElasticsearchIO.ConnectionConfiguration.create(esConnectionParams.getElasticsearchEndpoints(),
indexName, "_doc")*

*.withConnectTimeout(24)*

*.withSocketTimeout(24)*

*
.withUsername(esConnectionParams.getElasticsearchUsername()).withPassword(esConnectionParams.getElasticsearchPassword()))*

*);*

But I am unable to submit job successfully and getting following exception
in my READ transform:

WARNING: Size estimation of the source failed:
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource@7c974942
java.net.ConnectException: Operation timed out
at
org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:823)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
at
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getStats(ElasticsearchIO.java:797)
at
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:700)
at
org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:77)
at
org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51)
at
org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:38)
at
org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:35)
at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:484)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:423)
at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:182)
at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:888)
at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:194)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at io.prosimo.analytics.beam.datacopy.DataCopy.runDataCopy(DataCopy.java:43)
at io.prosimo.analytics.beam.datacopy.DataCopy.main(DataCopy.java:109)
Caused by: java.net.ConnectException: Operation timed out
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174)
at
org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at
org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at
org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
at java.lang.Thread.run(Thread.java:748)

I tried  various values while specifying connectTimeout, socketTimeout,
batchSize and scrollKeepalive, but still the same issue.

Any help would be greatly appreciated.

Thanks and Regards
Mohil


Re: disable ssl check/set verify cert false for elasticsearchIO

2020-09-25 Thread Mohil Khare
Great.. Thanks a lot for the pointers Luke.

Regards
Mohil

On Fri, Sep 25, 2020 at 9:23 AM Luke Cwik  wrote:

> Yeah, the JvmInitializer would be necessary if you wanted to use a file
> based keystore but I have seen people create in memory keystores[1].
>
> Take a look at the contribution guide[2].
>
> 1: https://gist.github.com/mikeapr4/3b2b5d05bc57640e77d0
> 2: https://beam.apache.org/contribute/
>
> On Fri, Sep 25, 2020 at 9:17 AM Mohil Khare  wrote:
>
>> Hi Luke,
>> Yeah, I looked at withTrustSelfSignedCerts options, but after looking at
>> elasticsearchIO code, it seems that flag comes into effect only when
>> "withKeystorePath" is provided.
>> I don't think that can be any gcs path. I believe that is a path in
>> worker VM and for that I will have to use JvmInitliazer to add a custom
>> keystore.
>>
>> Regarding: "Feel free to create a PR and add any options following the
>> pattern that is already there in the code" . What are the guidelines for
>> adding code and creating PR for beam codebase. Can you point me to the
>> relevant document?
>>
>> Thanks and regards
>> Mohil
>>
>>
>> On Fri, Sep 25, 2020 at 8:45 AM Luke Cwik  wrote:
>>
>>> It doesn't look like it based upon the javadoc:
>>>
>>> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.ConnectionConfiguration.html
>>>
>>> You can allow self signed certs which might be enough for you though:
>>>
>>> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.ConnectionConfiguration.html#withTrustSelfSignedCerts-boolean-
>>>
>>> Feel free to create a PR and add any options following the pattern that
>>> is already there in the code.
>>>
>>> On Thu, Sep 24, 2020 at 8:36 PM Mohil Khare  wrote:
>>>
>>>> Hello team and elasticSearchIO users,
>>>>
>>>> I am using beam java sdk 2.23.0 with elastic search as one of the sinks.
>>>>
>>>> Is there any way to turn off ssl check i.e. set cert verify false for
>>>> https connection with elasticsearchIO ? I know using regular clients, you
>>>> can do that. But can we achieve the same using elasticsearchIO ?
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>


Re: disable ssl check/set verify cert false for elasticsearchIO

2020-09-25 Thread Mohil Khare
Hi Luke,
Yeah, I looked at withTrustSelfSignedCerts options, but after looking at
elasticsearchIO code, it seems that flag comes into effect only when
"withKeystorePath" is provided.
I don't think that can be any gcs path. I believe that is a path in worker
VM and for that I will have to use JvmInitliazer to add a custom keystore.

Regarding: "Feel free to create a PR and add any options following the
pattern that is already there in the code" . What are the guidelines for
adding code and creating PR for beam codebase. Can you point me to the
relevant document?

Thanks and regards
Mohil


On Fri, Sep 25, 2020 at 8:45 AM Luke Cwik  wrote:

> It doesn't look like it based upon the javadoc:
>
> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.ConnectionConfiguration.html
>
> You can allow self signed certs which might be enough for you though:
>
> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.ConnectionConfiguration.html#withTrustSelfSignedCerts-boolean-
>
> Feel free to create a PR and add any options following the pattern that is
> already there in the code.
>
> On Thu, Sep 24, 2020 at 8:36 PM Mohil Khare  wrote:
>
>> Hello team and elasticSearchIO users,
>>
>> I am using beam java sdk 2.23.0 with elastic search as one of the sinks.
>>
>> Is there any way to turn off ssl check i.e. set cert verify false for
>> https connection with elasticsearchIO ? I know using regular clients, you
>> can do that. But can we achieve the same using elasticsearchIO ?
>>
>> Thanks and regards
>> Mohil
>>
>


disable ssl check/set verify cert false for elasticsearchIO

2020-09-24 Thread Mohil Khare
Hello team and elasticSearchIO users,

I am using beam java sdk 2.23.0 with elastic search as one of the sinks.

Is there any way to turn off ssl check i.e. set cert verify false for https
connection with elasticsearchIO ? I know using regular clients, you can do
that. But can we achieve the same using elasticsearchIO ?

Thanks and regards
Mohil


Re: Issue with Checkpointing in dataflow using BigQuery

2020-09-21 Thread Mohil Khare
Hello Luke,
Thanks for your reply.

1. YES, we do use the update option for updating our pipeline and yes by
doing so we don't worry about losing state as we don't need to restart beam
job.  But I think if we are doing some major algorithm changes or changes
like windowing algorithms or triggers etc, then we have to restart the
pipeline in order to avoid unpredictable results. But yes, I agree most of
the time the update option works fine.

2. I also use session windowing in some of the transforms. Here also I can
use it , but as I mentioned that there is no deterministic inactivity
period or rather GAP period that can determine a session. I will have to
then come up with a very large GAP period after which there is a guarantee
that a new session will start. And again this won't solve the problem when
we have to restart beam (when --update can't work).

3.  Yeah, you are correct, it's a simple KV put and get operation for
checkpointing. We have been using BIg query in our system, so we just
decided to leverage that, but yeah I can explore Bigtable or other options.
Thanks for suggesting these options.

Thanks and Regards
Mohil

On Mon, Sep 21, 2020 at 9:37 AM Luke Cwik  wrote:

> Have you tried doing a pipeline update with Dataflow[1]? If yes, what
> prevented you from using this all the time?
>
> Some users have been able to use session windows to solve problems in this
> space, have you explored this[2]? If yes, what didn't work for you?
>
> It seems as though you are doing a bunch of nosql like queries (e.g. save
> data for key X, load data for key Y) and not doing complex aggregations or
> needing multi-key transactions, have you considered using a datastore that
> is designed for this (e.g. Cloud Bigtable/Cloud Firestore/...)[3]?
>
> 1: https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline
> 2:
> https://beam.apache.org/documentation/programming-guide/#provided-windowing-functions
> 3: https://cloud.google.com/products/databases
>
>
> On Sun, Sep 20, 2020 at 7:53 PM Mohil Khare  wrote:
>
>> Hello team,
>>
>> I am using beam java sdk 2.23.0 on dataflow.
>>
>> I have a pipeline where I continuously read from Kafka and run through
>> various transforms and finally emit output to various sinks like Elastic
>> Search, Big Query, GCS buckets etc.
>>
>> There are few transforms where I maintain state of input KV pairs (using
>> Stateful Beam model) and update the incoming data based on metadata
>> maintained in state.
>>
>> Now the use case is that each of those KV pairs belong to some user
>> activity and we don't know how long this activity will last. It can run
>> from a few hours to a few days.  And in between if the beam runs into some
>> issue or for some maintenance we need to restart the beam job, we should be
>> able to retain the state.
>>
>> In order to accommodate the above requirement without maintaining a
>> global window, I maintain a fixed window of 24hrs and at window expiry, I
>> do Checkpointing by writing the state into the BigQuery table. Now in a new
>> window, if code doesn't find a state for a key, it does a Big Query read
>> and reload the state so that state is again maintained for a new window.
>>
>> My above implementation works fine as long as we have few keys to handle.
>> With a lot of user traffic which results in a lot of KV pairs, sometimes
>> leads to a lot of parallel Big Query read of state at the start of a new
>> window.  Since in our system we also have other modules, cloud functions
>> etc that keep reading from Big Query, sometime under heavy load, dataflow
>> receiving following exception while reading state/checkpoint from Big Query:
>>
>> exception: "com.google.cloud.bigquery.BigQueryException: Job exceeded
>> rate limits: Your project_and_region exceeded
>>
>> We have asked GCP folks to increase the limit, but then as per them it is
>> not good for performance and also increases cost.
>>
>> My question is:
>> *Is my above approach of checkpointing with Big Query correct ? Can
>> someone suggest a better approach for checkpointing in case of dataflow?*
>>
>> Thanks and Regards
>> Mohil
>>
>>
>>
>>
>>
>>
>>


Issue with Checkpointing in dataflow using BigQuery

2020-09-20 Thread Mohil Khare
Hello team,

I am using beam java sdk 2.23.0 on dataflow.

I have a pipeline where I continuously read from Kafka and run through
various transforms and finally emit output to various sinks like Elastic
Search, Big Query, GCS buckets etc.

There are few transforms where I maintain state of input KV pairs (using
Stateful Beam model) and update the incoming data based on metadata
maintained in state.

Now the use case is that each of those KV pairs belong to some user
activity and we don't know how long this activity will last. It can run
from a few hours to a few days.  And in between if the beam runs into some
issue or for some maintenance we need to restart the beam job, we should be
able to retain the state.

In order to accommodate the above requirement without maintaining a global
window, I maintain a fixed window of 24hrs and at window expiry, I do
Checkpointing by writing the state into the BigQuery table. Now in a new
window, if code doesn't find a state for a key, it does a Big Query read
and reload the state so that state is again maintained for a new window.

My above implementation works fine as long as we have few keys to handle.
With a lot of user traffic which results in a lot of KV pairs, sometimes
leads to a lot of parallel Big Query read of state at the start of a new
window.  Since in our system we also have other modules, cloud functions
etc that keep reading from Big Query, sometime under heavy load, dataflow
receiving following exception while reading state/checkpoint from Big Query:

exception: "com.google.cloud.bigquery.BigQueryException: Job exceeded rate
limits: Your project_and_region exceeded

We have asked GCP folks to increase the limit, but then as per them it is
not good for performance and also increases cost.

My question is:
*Is my above approach of checkpointing with Big Query correct ? Can
someone suggest a better approach for checkpointing in case of dataflow?*

Thanks and Regards
Mohil


Re: Need Support for ElasticSearch 7.x for beam

2020-08-24 Thread Mohil Khare
hello Kyle,

Thanks a lot for your prompt reply. Hmm.. strange, I think I was getting
some error that version was not supported. Not sure if I updated my beam
version last time when I tried with ES 7.x.
Let me try it again and let you know.

Thanks and Regards
Mohil

On Mon, Aug 24, 2020 at 11:54 AM Kyle Weaver  wrote:

> This ticket indicates Elasticsearch 7.x has been supported since Beam
> 2.19: https://issues.apache.org/jira/browse/BEAM-5192
>
> Are there any specific features you need that aren't supported?
>
> On Mon, Aug 24, 2020 at 11:33 AM Mohil Khare  wrote:
>
>> Hello,
>>
>> Firstly I am on java sdk 2.23.0 and we heavily use Elasticsearch as one
>> of our sinks.
>>
>> It's been a while since beam got upgraded to support elasticsearch
>> version greater than 6.x.
>> Elasticsearch has now moved on to 7.x and we want to use some of their
>> new security features.
>>
>> I want to know which version of beam will support elasticsearch version
>> 7.x.
>> I see the following commit, but it looks like it hasn't been merged with
>> master yet.
>>
>> https://github.com/apache/beam/pull/10023
>>
>>
>> Thanks and regards
>> Mohil
>>
>>
>>
>>


Need Support for ElasticSearch 7.x for beam

2020-08-24 Thread Mohil Khare
Hello,

Firstly I am on java sdk 2.23.0 and we heavily use Elasticsearch as one of
our sinks.

It's been a while since beam got upgraded to support elasticsearch version
greater than 6.x.
Elasticsearch has now moved on to 7.x and we want to use some of their new
security features.

I want to know which version of beam will support elasticsearch version 7.x.
I see the following commit, but it looks like it hasn't been merged with
master yet.

https://github.com/apache/beam/pull/10023


Thanks and regards
Mohil


Removing Duplicates from Sliding Window

2020-08-18 Thread Mohil Khare
Hello All,

Firstly I am using beam java sdk 2.23.0.

I have a use case where I continuously read streaming data from Kafka and
dump output to Elasticsearch after doing a bunch of PTransforms.

One such transform depends on the number of requests we have seen so far in
the last one hour (Last one hour since current time when data is being
processed) for a particular key.

I have defined transform like following

ReadTransform
--->Apply_Sliding_window_1Hr_every_1Min--->generate_KVTranform-->StatefulProcessingTransform(where
state maintains number of requests seen so
far)>Apply_fixed_window_5secs--->GroupByKey.create()>Remove_DupFromSlidingWindows--->WriteLogToES


1. I am applying a sliding window of 1 hour (every 1min) so that I can get
a snapshot of the last 1 hr.
2. Stateful Processing Transform adds last hour req count based on state
maintained in a window per key.
3. NOW--since multiple overlapping windows can process and produce same Log
msg, I do following additional steps to emit only one unique log:
   a) Firstly in stateful processing.. I add current window start time
to each log
   b) Apply fixed window 5 secs
   c). GroupByCreate() --> which should be producing max 60 outputs
   d) In Remove_DupFromSlidingWindows I sort Iterable based on each
log's window start time (added in (a)) and emit log from window with
earliest timestamp which will give most accurate snapshot of last 1 hour
req count.

I am not sure if performance wise I am solving my use case correctly. I
want to avoid these additional steps, especially GroupBy.Create() and then
deDup.

Is there any better way of solving this use case ?
Any advice will be greatly appreciated.

Thanks and Regards
Mohil


ElasticsearchIO: Connection closed and Cannot get Elasticsearch version exceptions

2020-08-09 Thread Mohil Khare
Hello All,

I am using apache beam java sdk 2.19 and elastic search IO 6.x.

I keep getting following exception while dumping streaming logs to ES:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: Cannot get Elasticsearch version

   1.
  1. atorg.apache.beam.runners.dataflow.worker.
  IntrinsicMapTaskExecutorFactory$1.typedApply (
  IntrinsicMapTaskExecutorFactory.java:194
  

  )
  2. atorg.apache.beam.runners.dataflow.worker.
  IntrinsicMapTaskExecutorFactory$1.typedApply (
  IntrinsicMapTaskExecutorFactory.java:165
  

  )
  3. atorg.apache.beam.runners.dataflow.worker.graph.
  Networks$TypeSafeNodeFunction.apply (Networks.java:63
  

  )
  4. atorg.apache.beam.runners.dataflow.worker.graph.
  Networks$TypeSafeNodeFunction.apply (Networks.java:50
  

  )
  5. atorg.apache.beam.runners.dataflow.worker.graph.
  Networks.replaceDirectedNetworkNodes (Networks.java:87
  

  )
  6. atorg.apache.beam.runners.dataflow.worker.
  IntrinsicMapTaskExecutorFactory.create (
  IntrinsicMapTaskExecutorFactory.java:125
  

  )
  7. atorg.apache.beam.runners.dataflow.worker.
  StreamingDataflowWorker.process (StreamingDataflowWorker.java:1266
  

  )
  8. atorg.apache.beam.runners.dataflow.worker.
  StreamingDataflowWorker.access$1100 (StreamingDataflowWorker.java:152
  

  )
  9. atorg.apache.beam.runners.dataflow.worker.
  StreamingDataflowWorker$7.run (StreamingDataflowWorker.java:1073
  

  )
  10. atjava.util.concurrent.ThreadPoolExecutor.runWorker (
  ThreadPoolExecutor.java:1149
  

  )
   2.

Caused by: org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: Cannot get Elasticsearch version

   1.
  1. atorg.apache.beam.sdk.util.UserCodeException.wrap (
  UserCodeException.java:34
  

  )
  2. atorg.apache.beam.sdk.io.elasticsearch.
  ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup (Unknown Source)
  3. atorg.apache.beam.runners.dataflow.worker.
  DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy (
  DoFnInstanceManagers.java:80
  

  )
  4. atorg.apache.beam.runners.dataflow.worker.
  DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek (
  DoFnInstanceManagers.java:62
  

Session Windowing followed by CoGroupByKey

2020-08-06 Thread Mohil Khare
Hello All,

I need to seek advice whether Session Windowing followed by CoGroupByKey is
a correct way to solve my use case or not and if YES, then whether I am
using it correctly or not.
Please note that I am using java sdk 2.19 on google dataflow

I have two streams of data coming from two different kafka topics and I
need to correlate them using the common key present in both of them. I
expect all the logs for a key to arrive within 90 seconds in both topics
and hence I decided to use session window

1. Read data from kafka topic like following:

PCollection> collection1 =
p

.apply("Read_From_Kafka", KafkaIO.read()

.withBootstrapServers(servers)

.withTopics(Arrays.asList(“topic1”))

.withKeyDeserializer(StringDeserializer.class)

.withValueDeserializer(ByteArrayDeserializer.class)

.withConsumerConfigUpdates(kafkaConsumerProperties)

.withConsumerFactoryFn(consumerFactoryObj)

.commitOffsetsInFinalize())

.apply("Applying_Fixed_Window", Window.>into(FixedWindows.of(Duration.standardSeconds(10)))

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

.withAllowedLateness(Duration.standardSeconds(360))

.discardingFiredPanes())

.apply("Convert_KafkaRecord_To_PCollection",

ParDo.of(new ParseLogsPOJO1(;


PCollection> collection2 =
p

.apply("Read_From_Kafka", KafkaIO.read()

.withBootstrapServers(servers)

.withTopics(Arrays.asList(“topic2”))

.withKeyDeserializer(StringDeserializer.class)

.withValueDeserializer(ByteArrayDeserializer.class)

.withConsumerConfigUpdates(kafkaConsumerProperties)

.withConsumerFactoryFn(consumerFactoryObj)

.commitOffsetsInFinalize())

.apply("Applying_Fixed_Window", Window.>into(FixedWindows.of(Duration.standardSeconds(10)))

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

.withAllowedLateness(Duration.standardSeconds(360))

.discardingFiredPanes())

.apply("Convert_KafkaRecord_To_PCollection",

ParDo.of(new ParseLogsPOJO2(;


2. Put each of the above collections in a session window with gap period 90
secs


   PCollection> sessionWindowedPOJO1 =

Collection1

.apply("Applying_Session_Window",

Window.>into(Sessions.withGapDuration(Duration.standardSeconds(90)))


.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))

.withAllowedLateness(Duration.ZERO).discardingFiredPanes());


 PCollection> sessionWindowedPOJO2 =

Collection1

.apply("Applying_Session_Window",

Window.>into(Sessions.withGapDuration(Duration.standardSeconds(90)))


.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))

.withAllowedLateness(Duration.ZERO).discardingFiredPanes());


3. CoGroupByKey and get correlated logs


   PCollection coGbkLogs =

KeyedPCollectionTuple.of(“tag1”, sessionWindowedPOJO1)

.and(“tag2”, sessionWindowedPOJO2)

.apply("CoGroupByMyKey”, CoGroupByKey.create())

.apply("Correlate_Logs_PerLogID", ParDo.of(new Correlate())




   Is this a correct way to solve my use case?


Looking forward to hearing from someone soon.


Thanks and Regards

Mohil


Re: Intermittent Slowness with kafkaIO read

2020-08-05 Thread Mohil Khare
I also have a question that if wrong windowing is messing up the received
timestamp ??

Thanks and regards
Mohil

On Wed, Aug 5, 2020 at 1:19 PM Mohil Khare  wrote:

> Just to let you know, this is how I setup kafkaIO read:
>
> p
>
> .apply("Read_From_Kafka", KafkaIO.read()
>
> .withBootstrapServers(servers)
>
> .withTopics(Arrays.asList(“topic1”, “topic2”))
>
> .withKeyDeserializer(StringDeserializer.class)
>
> .withValueDeserializer(ByteArrayDeserializer.class)
>
> .withConsumerConfigUpdates(kafkaConsumerProperties)
>
> .withConsumerFactoryFn(consumerFactoryObj)
>
> .commitOffsetsInFinalize())
>
> .apply("Applying_Fixed_Window", Window. byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>
> .withAllowedLateness(Duration.standardSeconds(360))
>
> .discardingFiredPanes())
>
> .apply("Convert_KafkaRecord_To_PCollection",
>
> ParDo.of(new ParseLogs(;
>
>
>
> Where kafkaConsumerProperties is following map
>
>
> kafkaConsumerProperties.put("security.protocol", "SSL");
>
> kafkaConsumerProperties.put("auto.offset.reset", "latest");
>
> kafkaConsumerProperties.put("group.id", “consumer1”);
>
> kafkaConsumerProperties.put("default.api.timeout.ms", 18);
>
>
> And inside consumerFactoryObj I setup ssl keystrokes
>
>
> Thanks and Regards
>
> Mohil
>
> On Wed, Aug 5, 2020 at 12:59 PM Mohil Khare  wrote:
>
>> Hello,
>>
>> I am using Beam java Sdk 2.19 on dataflow. We have a system where log
>> shipper continuously emit logs to kafka and beam read logs using KafkaIO.
>>
>> Sometime I am seeing slowness on kafkaIO read with one of the topics
>> (probably during peak traffic period), where there is a 2-3 minutes between
>> record timestamp and time when the beam reads the log. For instance:
>>
>> 2020-08-05 12:46:23.826 PDT
>>
>>  {"@timestamp":1596656684.274594,"time":"2020-08-05T19:44:44.274594282Z”,
>> “data” : data}, offset: 2148857. timestamp: 1596656685005
>>
>>
>>
>> If you convert record timestamp (1596656685005) which is in epoch ms to
>> PDT, you will see approx 2 mins difference between  this and 2020-08-05
>> 12:46:23.826 PDT (time when beam actually reads the data).
>>
>> So One way of achieving horizontal scaling here is by increasing the
>> number of partitions on kafka broker. What can be done on the beam side
>> i.e. kafkaIO side to tackle this slowness ? Any suggestions?
>>
>> Thanks and regards
>> Mohil
>>
>>
>>
>>


Re: Intermittent Slowness with kafkaIO read

2020-08-05 Thread Mohil Khare
Just to let you know, this is how I setup kafkaIO read:

p

.apply("Read_From_Kafka", KafkaIO.read()

.withBootstrapServers(servers)

.withTopics(Arrays.asList(“topic1”, “topic2”))

.withKeyDeserializer(StringDeserializer.class)

.withValueDeserializer(ByteArrayDeserializer.class)

.withConsumerConfigUpdates(kafkaConsumerProperties)

.withConsumerFactoryFn(consumerFactoryObj)

.commitOffsetsInFinalize())

.apply("Applying_Fixed_Window", Window.>into(FixedWindows.of(Duration.standardSeconds(10)))

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

.withAllowedLateness(Duration.standardSeconds(360))

.discardingFiredPanes())

.apply("Convert_KafkaRecord_To_PCollection",

ParDo.of(new ParseLogs(;



Where kafkaConsumerProperties is following map


kafkaConsumerProperties.put("security.protocol", "SSL");

kafkaConsumerProperties.put("auto.offset.reset", "latest");

kafkaConsumerProperties.put("group.id", “consumer1”);

kafkaConsumerProperties.put("default.api.timeout.ms", 18);


And inside consumerFactoryObj I setup ssl keystrokes


Thanks and Regards

Mohil

On Wed, Aug 5, 2020 at 12:59 PM Mohil Khare  wrote:

> Hello,
>
> I am using Beam java Sdk 2.19 on dataflow. We have a system where log
> shipper continuously emit logs to kafka and beam read logs using KafkaIO.
>
> Sometime I am seeing slowness on kafkaIO read with one of the topics
> (probably during peak traffic period), where there is a 2-3 minutes between
> record timestamp and time when the beam reads the log. For instance:
>
> 2020-08-05 12:46:23.826 PDT
>
>  {"@timestamp":1596656684.274594,"time":"2020-08-05T19:44:44.274594282Z”,
> “data” : data}, offset: 2148857. timestamp: 1596656685005
>
>
>
> If you convert record timestamp (1596656685005) which is in epoch ms to
> PDT, you will see approx 2 mins difference between  this and 2020-08-05
> 12:46:23.826 PDT (time when beam actually reads the data).
>
> So One way of achieving horizontal scaling here is by increasing the
> number of partitions on kafka broker. What can be done on the beam side
> i.e. kafkaIO side to tackle this slowness ? Any suggestions?
>
> Thanks and regards
> Mohil
>
>
>
>


Re: Exceptions: Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.

2020-08-04 Thread Mohil Khare
Thanks a lot Luke..

Regards
Mohil

On Tue, Aug 4, 2020 at 12:01 PM Luke Cwik  wrote:

> BEAM-6855 is still open and I updated it linking to this thread that a
> user is still being impacted.
>
> On Tue, Aug 4, 2020 at 10:20 AM Mohil Khare  wrote:
>
>> yeah .. looks like a bug still exists.
>>
>> So how does this work ? Shall I open a new Jira ?
>>
>> Thanks and regards
>> Mohil
>>
>> On Thu, Jul 30, 2020 at 10:39 PM Reuven Lax  wrote:
>>
>>> I believe that the person trying to fix BEAM-6855 was unable to
>>> reproduce it in test, and therefore assumed that the bug was fixed. However
>>> it appears that the bug still exists.
>>>
>>> On Wed, Jul 29, 2020 at 10:36 AM Kenneth Knowles 
>>> wrote:
>>>
>>>> Hi Mohil,
>>>>
>>>> It helps also to tell us what version of Beam you are using and some
>>>> more details. This looks related to
>>>> https://issues.apache.org/jira/browse/BEAM-6855 which claims to be
>>>> resolved in 2.17.0
>>>>
>>>> Kenn
>>>>
>>>> On Mon, Jul 27, 2020 at 11:47 PM Mohil Khare  wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> I think I found the reason for the issue.  Since the exception was
>>>>> thrown by StreamingSideInputDoFnRunner.java, I realize that I recently
>>>>> added side input to one of my ParDo that does stateful transformations.
>>>>> It looks like there is some issue when you add side input (My side
>>>>> input was coming via Global window to ParDo in a Fixed Window) to stateful
>>>>> DoFn.
>>>>>
>>>>> As a work around, instead of adding side input to stateful ParDo, I
>>>>> introduced another ParDo  that enriches streaming data with side input
>>>>> before flowing into stateful DoFn. That seems to have fixed the problem.
>>>>>
>>>>>
>>>>> Thanks and regards
>>>>> Mohil
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jul 27, 2020 at 10:50 AM Mohil Khare  wrote:
>>>>>
>>>>>> Hello All,
>>>>>>
>>>>>> Any idea how to debug this and find out which stage, which DoFn or
>>>>>> which side input is causing the problem?
>>>>>> Do I need to override OnTimer with every DoFn to avoid this problem?
>>>>>> I thought that some uncaught exceptions were causing this and added
>>>>>> various checks and exception handling in all DoFn and still seeing this
>>>>>> issue.
>>>>>> It has been driving me nuts. And now forget DRAIN, it happens during
>>>>>> normal functioning as well. Any help would be appreciated.
>>>>>>
>>>>>> java.lang.UnsupportedOperationException: Attempt to deliver a timer
>>>>>> to a DoFn, but timers are not supported in Dataflow.
>>>>>>
>>>>>>1.
>>>>>>   1. at org.apache.beam.runners.dataflow.worker.
>>>>>>   StreamingSideInputDoFnRunner.onTimer (
>>>>>>   StreamingSideInputDoFnRunner.java:86
>>>>>>   
>>>>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FStreamingSideInputDoFnRunner.java=86=prosimo-test>
>>>>>>   )
>>>>>>   2. at org.apache.beam.runners.dataflow.worker.
>>>>>>   SimpleParDoFn.processUserTimer (SimpleParDoFn.java:360
>>>>>>   
>>>>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=360=prosimo-test>
>>>>>>   )
>>>>>>   3. at org.apache.beam.runners.dataflow.worker.
>>>>>>   SimpleParDoFn.access$600 (SimpleParDoFn.java:73
>>>>>>   
>>>>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=73=prosimo-test>
>>>>>>   )
>>>>>>   4. at org.apache.beam.runners.dataflow.worker.
>>>>>>   SimpleParDoFn$TimerType$1.processTimer (SimpleParDoFn.java:444
>>>>>>   
>>&

Re: Exceptions: Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.

2020-08-04 Thread Mohil Khare
yeah .. looks like a bug still exists.

So how does this work ? Shall I open a new Jira ?

Thanks and regards
Mohil

On Thu, Jul 30, 2020 at 10:39 PM Reuven Lax  wrote:

> I believe that the person trying to fix BEAM-6855 was unable to reproduce
> it in test, and therefore assumed that the bug was fixed. However it
> appears that the bug still exists.
>
> On Wed, Jul 29, 2020 at 10:36 AM Kenneth Knowles  wrote:
>
>> Hi Mohil,
>>
>> It helps also to tell us what version of Beam you are using and some more
>> details. This looks related to
>> https://issues.apache.org/jira/browse/BEAM-6855 which claims to be
>> resolved in 2.17.0
>>
>> Kenn
>>
>> On Mon, Jul 27, 2020 at 11:47 PM Mohil Khare  wrote:
>>
>>> Hello all,
>>>
>>> I think I found the reason for the issue.  Since the exception was
>>> thrown by StreamingSideInputDoFnRunner.java, I realize that I recently
>>> added side input to one of my ParDo that does stateful transformations.
>>> It looks like there is some issue when you add side input (My side input
>>> was coming via Global window to ParDo in a Fixed Window) to stateful DoFn.
>>>
>>> As a work around, instead of adding side input to stateful ParDo, I
>>> introduced another ParDo  that enriches streaming data with side input
>>> before flowing into stateful DoFn. That seems to have fixed the problem.
>>>
>>>
>>> Thanks and regards
>>> Mohil
>>>
>>>
>>>
>>> On Mon, Jul 27, 2020 at 10:50 AM Mohil Khare  wrote:
>>>
>>>> Hello All,
>>>>
>>>> Any idea how to debug this and find out which stage, which DoFn or
>>>> which side input is causing the problem?
>>>> Do I need to override OnTimer with every DoFn to avoid this problem?
>>>> I thought that some uncaught exceptions were causing this and added
>>>> various checks and exception handling in all DoFn and still seeing this
>>>> issue.
>>>> It has been driving me nuts. And now forget DRAIN, it happens during
>>>> normal functioning as well. Any help would be appreciated.
>>>>
>>>> java.lang.UnsupportedOperationException: Attempt to deliver a timer to
>>>> a DoFn, but timers are not supported in Dataflow.
>>>>
>>>>1.
>>>>   1. at org.apache.beam.runners.dataflow.worker.
>>>>   StreamingSideInputDoFnRunner.onTimer (
>>>>   StreamingSideInputDoFnRunner.java:86
>>>>   
>>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FStreamingSideInputDoFnRunner.java=86=prosimo-test>
>>>>   )
>>>>   2. at org.apache.beam.runners.dataflow.worker.
>>>>   SimpleParDoFn.processUserTimer (SimpleParDoFn.java:360
>>>>   
>>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=360=prosimo-test>
>>>>   )
>>>>   3. at org.apache.beam.runners.dataflow.worker.
>>>>   SimpleParDoFn.access$600 (SimpleParDoFn.java:73
>>>>   
>>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=73=prosimo-test>
>>>>   )
>>>>   4. at org.apache.beam.runners.dataflow.worker.
>>>>   SimpleParDoFn$TimerType$1.processTimer (SimpleParDoFn.java:444
>>>>   
>>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=444=prosimo-test>
>>>>   )
>>>>   5. at org.apache.beam.runners.dataflow.worker.
>>>>   SimpleParDoFn.processTimers (SimpleParDoFn.java:473
>>>>   
>>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=473=prosimo-test>
>>>>   )
>>>>   6. at org.apache.beam.runners.dataflow.worker.
>>>>   SimpleParDoFn.processTimers (SimpleParDoFn.java:353
>>>>   
>>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=or

Re: Exceptions: Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.

2020-07-29 Thread Mohil Khare
Hi Kenneth,
I am on beam java sdk 2.19 With enableStreamingEngine set to true and using
default machine type (n1-standard-2).

Thanks and regards
Mohil



On Wed, Jul 29, 2020 at 10:36 AM Kenneth Knowles  wrote:

> Hi Mohil,
>
> It helps also to tell us what version of Beam you are using and some more
> details. This looks related to
> https://issues.apache.org/jira/browse/BEAM-6855 which claims to be
> resolved in 2.17.0
>
> Kenn
>
> On Mon, Jul 27, 2020 at 11:47 PM Mohil Khare  wrote:
>
>> Hello all,
>>
>> I think I found the reason for the issue.  Since the exception was thrown
>> by StreamingSideInputDoFnRunner.java, I realize that I recently added side
>> input to one of my ParDo that does stateful transformations.
>> It looks like there is some issue when you add side input (My side input
>> was coming via Global window to ParDo in a Fixed Window) to stateful DoFn.
>>
>> As a work around, instead of adding side input to stateful ParDo, I
>> introduced another ParDo  that enriches streaming data with side input
>> before flowing into stateful DoFn. That seems to have fixed the problem.
>>
>>
>> Thanks and regards
>> Mohil
>>
>>
>>
>> On Mon, Jul 27, 2020 at 10:50 AM Mohil Khare  wrote:
>>
>>> Hello All,
>>>
>>> Any idea how to debug this and find out which stage, which DoFn or which
>>> side input is causing the problem?
>>> Do I need to override OnTimer with every DoFn to avoid this problem?
>>> I thought that some uncaught exceptions were causing this and added
>>> various checks and exception handling in all DoFn and still seeing this
>>> issue.
>>> It has been driving me nuts. And now forget DRAIN, it happens during
>>> normal functioning as well. Any help would be appreciated.
>>>
>>> java.lang.UnsupportedOperationException: Attempt to deliver a timer to a
>>> DoFn, but timers are not supported in Dataflow.
>>>
>>>1.
>>>   1. at org.apache.beam.runners.dataflow.worker.
>>>   StreamingSideInputDoFnRunner.onTimer (
>>>   StreamingSideInputDoFnRunner.java:86
>>>   
>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FStreamingSideInputDoFnRunner.java=86=prosimo-test>
>>>   )
>>>   2. at org.apache.beam.runners.dataflow.worker.
>>>   SimpleParDoFn.processUserTimer (SimpleParDoFn.java:360
>>>   
>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=360=prosimo-test>
>>>   )
>>>   3. at org.apache.beam.runners.dataflow.worker.
>>>   SimpleParDoFn.access$600 (SimpleParDoFn.java:73
>>>   
>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=73=prosimo-test>
>>>   )
>>>   4. at org.apache.beam.runners.dataflow.worker.
>>>   SimpleParDoFn$TimerType$1.processTimer (SimpleParDoFn.java:444
>>>   
>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=444=prosimo-test>
>>>   )
>>>   5. at org.apache.beam.runners.dataflow.worker.
>>>   SimpleParDoFn.processTimers (SimpleParDoFn.java:473
>>>   
>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=473=prosimo-test>
>>>   )
>>>   6. at org.apache.beam.runners.dataflow.worker.
>>>   SimpleParDoFn.processTimers (SimpleParDoFn.java:353
>>>   
>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=353=prosimo-test>
>>>   )
>>>   7. at org.apache.beam.runners.dataflow.worker.util.common.worker.
>>>   ParDoOperation.finish (ParDoOperation.java:52
>>>   
>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker.util.common.worker%2FParDoOperation.java=52=prosimo-test>
>>&g

Re: Exceptions: Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.

2020-07-28 Thread Mohil Khare
Hello all,

I think I found the reason for the issue.  Since the exception was thrown
by StreamingSideInputDoFnRunner.java, I realize that I recently added side
input to one of my ParDo that does stateful transformations.
It looks like there is some issue when you add side input (My side input
was coming via Global window to ParDo in a Fixed Window) to stateful DoFn.

As a work around, instead of adding side input to stateful ParDo, I
introduced another ParDo  that enriches streaming data with side input
before flowing into stateful DoFn. That seems to have fixed the problem.


Thanks and regards
Mohil



On Mon, Jul 27, 2020 at 10:50 AM Mohil Khare  wrote:

> Hello All,
>
> Any idea how to debug this and find out which stage, which DoFn or which
> side input is causing the problem?
> Do I need to override OnTimer with every DoFn to avoid this problem?
> I thought that some uncaught exceptions were causing this and added
> various checks and exception handling in all DoFn and still seeing this
> issue.
> It has been driving me nuts. And now forget DRAIN, it happens during
> normal functioning as well. Any help would be appreciated.
>
> java.lang.UnsupportedOperationException: Attempt to deliver a timer to a
> DoFn, but timers are not supported in Dataflow.
>
>1.
>   1. at org.apache.beam.runners.dataflow.worker.
>   StreamingSideInputDoFnRunner.onTimer (
>   StreamingSideInputDoFnRunner.java:86
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FStreamingSideInputDoFnRunner.java=86=prosimo-test>
>   )
>   2. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.processUserTimer (SimpleParDoFn.java:360
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=360=prosimo-test>
>   )
>   3. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.access$600 (SimpleParDoFn.java:73
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=73=prosimo-test>
>   )
>   4. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn$TimerType$1.processTimer (SimpleParDoFn.java:444
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=444=prosimo-test>
>   )
>   5. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.processTimers (SimpleParDoFn.java:473
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=473=prosimo-test>
>   )
>   6. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.processTimers (SimpleParDoFn.java:353
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=353=prosimo-test>
>   )
>   7. at org.apache.beam.runners.dataflow.worker.util.common.worker.
>   ParDoOperation.finish (ParDoOperation.java:52
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker.util.common.worker%2FParDoOperation.java=52=prosimo-test>
>   )
>   8. at org.apache.beam.runners.dataflow.worker.util.common.worker.
>   MapTaskExecutor.execute (MapTaskExecutor.java:85
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker.util.common.worker%2FMapTaskExecutor.java=85=prosimo-test>
>   )
>   9. at org.apache.beam.runners.dataflow.worker.
>   StreamingDataflowWorker.process (StreamingDataflowWorker.java:1350
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java=1350=prosimo-test>
>   )
>   10. at org.apache.beam.runners.dataflow.worker.
>   StreamingDataflowWorker.access$1100 (
>   StreamingDataflowWorker.java:152
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSt

Exceptions: Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.

2020-07-27 Thread Mohil Khare
Hello All,

Any idea how to debug this and find out which stage, which DoFn or which
side input is causing the problem?
Do I need to override OnTimer with every DoFn to avoid this problem?
I thought that some uncaught exceptions were causing this and added various
checks and exception handling in all DoFn and still seeing this issue.
It has been driving me nuts. And now forget DRAIN, it happens during normal
functioning as well. Any help would be appreciated.

java.lang.UnsupportedOperationException: Attempt to deliver a timer to a
DoFn, but timers are not supported in Dataflow.

   1.
  1. at org.apache.beam.runners.dataflow.worker.
  StreamingSideInputDoFnRunner.onTimer (
  StreamingSideInputDoFnRunner.java:86
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FStreamingSideInputDoFnRunner.java=86=prosimo-test>
  )
  2. at org.apache.beam.runners.dataflow.worker.
  SimpleParDoFn.processUserTimer (SimpleParDoFn.java:360
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=360=prosimo-test>
  )
  3. at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$600
   (SimpleParDoFn.java:73
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=73=prosimo-test>
  )
  4. at org.apache.beam.runners.dataflow.worker.
  SimpleParDoFn$TimerType$1.processTimer (SimpleParDoFn.java:444
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=444=prosimo-test>
  )
  5. at org.apache.beam.runners.dataflow.worker.
  SimpleParDoFn.processTimers (SimpleParDoFn.java:473
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=473=prosimo-test>
  )
  6. at org.apache.beam.runners.dataflow.worker.
  SimpleParDoFn.processTimers (SimpleParDoFn.java:353
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=353=prosimo-test>
  )
  7. at org.apache.beam.runners.dataflow.worker.util.common.worker.
  ParDoOperation.finish (ParDoOperation.java:52
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker.util.common.worker%2FParDoOperation.java=52=prosimo-test>
  )
  8. at org.apache.beam.runners.dataflow.worker.util.common.worker.
  MapTaskExecutor.execute (MapTaskExecutor.java:85
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker.util.common.worker%2FMapTaskExecutor.java=85=prosimo-test>
  )
  9. at org.apache.beam.runners.dataflow.worker.
  StreamingDataflowWorker.process (StreamingDataflowWorker.java:1350
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java=1350=prosimo-test>
  )
  10. at org.apache.beam.runners.dataflow.worker.
  StreamingDataflowWorker.access$1100 (StreamingDataflowWorker.java:152
  
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-26_20_35_51-7756840794032391970=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java=152=prosimo-test>
  )
   2.
  1. at org.apache.beam.runners.dataflow.worker.
  StreamingDataflowWorker$7.run (StreamingDataflowWorker.java:1073)
  2. at java.util.concurrent.ThreadPoolExecutor.runWorker (
  ThreadPoolExecutor.java:1149)
  3. at java.util.concurrent.ThreadPoolExecutor$Worker.run (
  ThreadPoolExecutor.java:624)
  4. at java.lang.Thread.run (Thread.java:748)

Thanks
Mohil


On Sun, Jul 26, 2020 at 1:50 PM Mohil Khare  wrote:

> and it seems be due  to TimerType User
>
> Thanks
> Mohil
>
> On Sun, Jul 26, 2020 at 1:42 PM Mohil Khare  wrote:
>
>> Hello,
>>
>> I was looking at source code of
>> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
>> ,
>>
>> It seems the default implementation of OnTimer is to throw (Attempt to
>> deliver a 

Re: On DRAIN: Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.

2020-07-26 Thread Mohil Khare
and it seems be due  to TimerType User

Thanks
Mohil

On Sun, Jul 26, 2020 at 1:42 PM Mohil Khare  wrote:

> Hello,
>
> I was looking at source code of
> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
> ,
>
> It seems the default implementation of OnTimer is to throw (Attempt to
> deliver a timer to a DoFn, but timers are not supported in Dataflow).
>
> Do you know under what circumstances, My code might be throwing this ?
> Not sure if its some issue in 2.19 which might have  been fixed now with
> 2.22
>
> Thanks and Regards
> Mohil
>
>
> On Fri, Jul 24, 2020 at 5:21 PM Mohil Khare  wrote:
>
>> Actually NOT JUST DRAIN, seeing it during regular RUN as well.  It's
>> getting flooded with this exception
>>
>> Attempt to deliver a timer to a DoFn, but timers are not supported in
>> Dataflow.
>> The changes that I did are the following:
>> 1. Read one set of logs from one of the kafka topics and create KV(ID,
>> log1)
>> 2. Read 2nd set of logs from another kafka topic and create KV(ID, log2)
>> 3. Put above in session window
>> 4. CoGroupByKey both logs
>>
>> Apart from aforementioned exceptions, I am also seeing the following in
>> worker logs. What I have noticed is that it happens when the pipeline is
>> silent for a while. i.e. no new logs to be read from Kafka (Not sure if it
>> is the actual reason).
>>
>> 2020-07-24 17:06:43.532 PDT
>> Execution of work for P188 for key
>> cloud@prosimo.ioHe8fc8079-c844-11ea-a6d5-dabe1eb9c630 failed. Will retry
>> locally.
>>
>> <https://console.cloud.google.com/logs/viewer?advancedFilter=insertId%3D%224729149996009968456:27267:0:42419165%22%20resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222020-07-24_16_23_20-4320159532106629306%22%20(logName%3D%22projects%2Fprosimo-test%2Flogs%2Fdataflow.googleapis.com%252Fworker%22)%20severity%3E%3DERROR%0A(timestamp%3E%222020-07-25T00:06:33.224Z%22%20OR%20(insertId%3E%224729149996009968456:27267:0:42264272%22%20AND%20timestamp%3D%222020-07-25T00:06:33.224Z%22))=true=JUMP_TO_TIME=2020-07-25T00:06:43.532Z=2020-07-25T00:06:43.532Z=prosimo-test>
>> 2020-07-24 17:06:53.863 PDT
>> Uncaught exception:
>>
>>
>>
>> On Fri, Jul 24, 2020 at 2:59 PM Mohil Khare  wrote:
>>
>>> Hello,
>>>
>>> I am on java sdk 2.19 and using dataflow for beam job. I use Timers for
>>> my stateful transformations, but recently I started seeing the following
>>> exception on DRAINING a job. It used to work fine and not sure what changed.
>>>
>>> *java.lang.UnsupportedOperationException:*
>>>
>> *Attempt to deliver a timer to a DoFn, but timers are not supported in
>> Dataflow. *
>>
>>>
>>>1.
>>>   1. atorg.apache.beam.runners.dataflow.worker.
>>>   StreamingSideInputDoFnRunner.onTimer (
>>>   StreamingSideInputDoFnRunner.java:86
>>>   
>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FStreamingSideInputDoFnRunner.java=86=prosimo-test>
>>>   )
>>>   2. atorg.apache.beam.runners.dataflow.worker.
>>>   SimpleParDoFn.processUserTimer (SimpleParDoFn.java:360
>>>   
>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=360=prosimo-test>
>>>   )
>>>   3. atorg.apache.beam.runners.dataflow.worker.
>>>   SimpleParDoFn.access$600 (SimpleParDoFn.java:73
>>>   
>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=73=prosimo-test>
>>>   )
>>>   4. atorg.apache.beam.runners.dataflow.worker.
>>>   SimpleParDoFn$TimerType$1.processTimer (SimpleParDoFn.java:444
>>>   
>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=444=prosimo-test>
>>>   )
>>>   5. atorg.apache.beam.runners.dataflow.worker.
>>>   SimpleParDoFn.processTimers (SimpleParDoFn.java:473
>>>   
>>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-56

Re: On DRAIN: Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.

2020-07-26 Thread Mohil Khare
Hello,

I was looking at source code of
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
,

It seems the default implementation of OnTimer is to throw (Attempt to
deliver a timer to a DoFn, but timers are not supported in Dataflow).

Do you know under what circumstances, My code might be throwing this ?  Not
sure if its some issue in 2.19 which might have  been fixed now with 2.22

Thanks and Regards
Mohil


On Fri, Jul 24, 2020 at 5:21 PM Mohil Khare  wrote:

> Actually NOT JUST DRAIN, seeing it during regular RUN as well.  It's
> getting flooded with this exception
>
> Attempt to deliver a timer to a DoFn, but timers are not supported in
> Dataflow.
> The changes that I did are the following:
> 1. Read one set of logs from one of the kafka topics and create KV(ID,
> log1)
> 2. Read 2nd set of logs from another kafka topic and create KV(ID, log2)
> 3. Put above in session window
> 4. CoGroupByKey both logs
>
> Apart from aforementioned exceptions, I am also seeing the following in
> worker logs. What I have noticed is that it happens when the pipeline is
> silent for a while. i.e. no new logs to be read from Kafka (Not sure if it
> is the actual reason).
>
> 2020-07-24 17:06:43.532 PDT
> Execution of work for P188 for key
> cloud@prosimo.ioHe8fc8079-c844-11ea-a6d5-dabe1eb9c630 failed. Will retry
> locally.
>
> <https://console.cloud.google.com/logs/viewer?advancedFilter=insertId%3D%224729149996009968456:27267:0:42419165%22%20resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222020-07-24_16_23_20-4320159532106629306%22%20(logName%3D%22projects%2Fprosimo-test%2Flogs%2Fdataflow.googleapis.com%252Fworker%22)%20severity%3E%3DERROR%0A(timestamp%3E%222020-07-25T00:06:33.224Z%22%20OR%20(insertId%3E%224729149996009968456:27267:0:42264272%22%20AND%20timestamp%3D%222020-07-25T00:06:33.224Z%22))=true=JUMP_TO_TIME=2020-07-25T00:06:43.532Z=2020-07-25T00:06:43.532Z=prosimo-test>
> 2020-07-24 17:06:53.863 PDT
> Uncaught exception:
>
>
>
> On Fri, Jul 24, 2020 at 2:59 PM Mohil Khare  wrote:
>
>> Hello,
>>
>> I am on java sdk 2.19 and using dataflow for beam job. I use Timers for
>> my stateful transformations, but recently I started seeing the following
>> exception on DRAINING a job. It used to work fine and not sure what changed.
>>
>> *java.lang.UnsupportedOperationException:*
>>
> *Attempt to deliver a timer to a DoFn, but timers are not supported in
> Dataflow. *
>
>>
>>1.
>>   1. atorg.apache.beam.runners.dataflow.worker.
>>   StreamingSideInputDoFnRunner.onTimer (
>>   StreamingSideInputDoFnRunner.java:86
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FStreamingSideInputDoFnRunner.java=86=prosimo-test>
>>   )
>>   2. atorg.apache.beam.runners.dataflow.worker.
>>   SimpleParDoFn.processUserTimer (SimpleParDoFn.java:360
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=360=prosimo-test>
>>   )
>>   3. atorg.apache.beam.runners.dataflow.worker.
>>   SimpleParDoFn.access$600 (SimpleParDoFn.java:73
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=73=prosimo-test>
>>   )
>>   4. atorg.apache.beam.runners.dataflow.worker.
>>   SimpleParDoFn$TimerType$1.processTimer (SimpleParDoFn.java:444
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=444=prosimo-test>
>>   )
>>   5. atorg.apache.beam.runners.dataflow.worker.
>>   SimpleParDoFn.processTimers (SimpleParDoFn.java:473
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=473=prosimo-test>
>>   )
>>   6. atorg.apache.beam.runners.dataflow.worker.
>>   SimpleParDoFn.processTimers (SimpleParDoFn.java:353
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=353=prosimo-test>
>>

Re: On DRAIN: Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.

2020-07-24 Thread Mohil Khare
Actually NOT JUST DRAIN, seeing it during regular RUN as well.  It's
getting flooded with this exception

Attempt to deliver a timer to a DoFn, but timers are not supported in
Dataflow.
The changes that I did are the following:
1. Read one set of logs from one of the kafka topics and create KV(ID, log1)
2. Read 2nd set of logs from another kafka topic and create KV(ID, log2)
3. Put above in session window
4. CoGroupByKey both logs

Apart from aforementioned exceptions, I am also seeing the following in
worker logs. What I have noticed is that it happens when the pipeline is
silent for a while. i.e. no new logs to be read from Kafka (Not sure if it
is the actual reason).

2020-07-24 17:06:43.532 PDT
Execution of work for P188 for key
cloud@prosimo.ioHe8fc8079-c844-11ea-a6d5-dabe1eb9c630 failed. Will retry
locally.
<https://console.cloud.google.com/logs/viewer?advancedFilter=insertId%3D%224729149996009968456:27267:0:42419165%22%20resource.type%3D%22dataflow_step%22%20resource.labels.job_id%3D%222020-07-24_16_23_20-4320159532106629306%22%20(logName%3D%22projects%2Fprosimo-test%2Flogs%2Fdataflow.googleapis.com%252Fworker%22)%20severity%3E%3DERROR%0A(timestamp%3E%222020-07-25T00:06:33.224Z%22%20OR%20(insertId%3E%224729149996009968456:27267:0:42264272%22%20AND%20timestamp%3D%222020-07-25T00:06:33.224Z%22))=true=JUMP_TO_TIME=2020-07-25T00:06:43.532Z=2020-07-25T00:06:43.532Z=prosimo-test>
2020-07-24 17:06:53.863 PDT
Uncaught exception:



On Fri, Jul 24, 2020 at 2:59 PM Mohil Khare  wrote:

> Hello,
>
> I am on java sdk 2.19 and using dataflow for beam job. I use Timers for my
> stateful transformations, but recently I started seeing the following
> exception on DRAINING a job. It used to work fine and not sure what changed.
>
> *java.lang.UnsupportedOperationException:*
>
*Attempt to deliver a timer to a DoFn, but timers are not supported in
Dataflow. *

>
>1.
>   1. atorg.apache.beam.runners.dataflow.worker.
>   StreamingSideInputDoFnRunner.onTimer (
>   StreamingSideInputDoFnRunner.java:86
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FStreamingSideInputDoFnRunner.java=86=prosimo-test>
>   )
>   2. atorg.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.processUserTimer (SimpleParDoFn.java:360
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=360=prosimo-test>
>   )
>   3. atorg.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.access$600 (SimpleParDoFn.java:73
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=73=prosimo-test>
>   )
>   4. atorg.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn$TimerType$1.processTimer (SimpleParDoFn.java:444
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=444=prosimo-test>
>   )
>   5. atorg.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.processTimers (SimpleParDoFn.java:473
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=473=prosimo-test>
>   )
>   6. atorg.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.processTimers (SimpleParDoFn.java:353
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker%2FSimpleParDoFn.java=353=prosimo-test>
>   )
>   7. atorg.apache.beam.runners.dataflow.worker.util.common.worker.
>   ParDoOperation.finish (ParDoOperation.java:52
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker.util.common.worker%2FParDoOperation.java=52=prosimo-test>
>   )
>   8. atorg.apache.beam.runners.dataflow.worker.util.common.worker.
>   MapTaskExecutor.execute (MapTaskExecutor.java:85
>   
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-24_14_14_42-5619042994908733980=org%2Fapache.beam.runners.dataflow.worker.util.common.worker%2FMapTaskExecutor.java=85=prosimo-test>
>   )
>   9. atorg.apache.beam.runners.dataflow.worker.
>   StreamingDataflowWorker.process (StreamingDataflowWorker.java:1350
&g

On DRAIN: Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.

2020-07-24 Thread Mohil Khare
Hello,

I am on java sdk 2.19 and using dataflow for beam job. I use Timers for my
stateful transformations, but recently I started seeing the following
exception on DRAINING a job. It used to work fine and not sure what changed.

java.lang.UnsupportedOperationException:

   1.
  1. atorg.apache.beam.runners.dataflow.worker.
  StreamingSideInputDoFnRunner.onTimer (
  StreamingSideInputDoFnRunner.java:86
  

  )
  2. atorg.apache.beam.runners.dataflow.worker.
  SimpleParDoFn.processUserTimer (SimpleParDoFn.java:360
  

  )
  3. atorg.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$600
  (SimpleParDoFn.java:73
  

  )
  4. atorg.apache.beam.runners.dataflow.worker.
  SimpleParDoFn$TimerType$1.processTimer (SimpleParDoFn.java:444
  

  )
  5. atorg.apache.beam.runners.dataflow.worker.
  SimpleParDoFn.processTimers (SimpleParDoFn.java:473
  

  )
  6. atorg.apache.beam.runners.dataflow.worker.
  SimpleParDoFn.processTimers (SimpleParDoFn.java:353
  

  )
  7. atorg.apache.beam.runners.dataflow.worker.util.common.worker.
  ParDoOperation.finish (ParDoOperation.java:52
  

  )
  8. atorg.apache.beam.runners.dataflow.worker.util.common.worker.
  MapTaskExecutor.execute (MapTaskExecutor.java:85
  

  )
  9. atorg.apache.beam.runners.dataflow.worker.
  StreamingDataflowWorker.process (StreamingDataflowWorker.java:1350
  

  )
  10. atorg.apache.beam.runners.dataflow.worker.
  StreamingDataflowWorker.access$1100 (StreamingDataflowWorker.java:152
  

  )
   2. Sometime back I opened jira for an issue which is related to this
   while doing DRAIN: https://issues.apache.org/jira/browse/BEAM-10053 (Looks
   like no one has taken a stab on this Jira)
   3. Not sure if the reason is same and due to multiple side inputs that i
   use in my PTransforms.


Any help would be appreciated.

Thanks and Regards
Mohil


Re: FileIO write to new folder every hour.

2020-07-10 Thread Mohil Khare
BTW, if you want current hour based on your interval window you can also do:

class DatePartitionedFileName implements FileIO.Write.FileNaming {


private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormat.forPattern(“HH”); —> some java function is there

DatePartitionedFileName(String subpath) {


}



  @Override

public String getFilename(BoundedWindow window, PaneInfo pane, int
numShards, int shardIndex, Compression compression) {

IntervalWindow intervalWindow = (IntervalWindow) window;

IntervalWindow intervalWindow = (IntervalWindow) window;

return String.format(

%s/file.parquet",

subpath,

 *   DATE_FORMAT.print(intervalWindow.start()*);

}

}

Regards
Mohil

On Fri, Jul 10, 2020 at 3:39 PM Mohil Khare  wrote:

> Hello Julius,
>
> Well I do something similar using FileIO.Write.FileNaming i,e,
> https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html
>
> You can do something like following:
> .apply(FileIO.*write*()
> .via(ParquetIO.*sink*(*getOutput_schema*()))
> .to(outputPath)
> .withNumShards(1)
>
>*.withNaming(new
> DatePartitionedFileName(subpath))*
>
> Where your DatePartitionedFileName will implete FileIO.Write.FileNaming
> and override its getFileName method i.e. something like following:
>
> class DatePartitionedFileName implements FileIO.Write.FileNaming {
>
>
> DatePartitionedFileName(String subpath) {
>
>
> }
>
>
>
>   @Override
>
> public String getFilename(BoundedWindow window, PaneInfo pane, int
> numShards, int shardIndex, Compression compression) {
>
> IntervalWindow intervalWindow = (IntervalWindow) window;
>
> return String.format(
>
> %s/file.parquet",
>
> subpath,
>
> some function like--> getCurrentHour());
>
> }
>
> }
>
> Hope this helps.
>
> Regards
> Mohil
>
>
> On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius 
> wrote:
>
>> Hi Team,
>>
>>
>>
>> I am trying to write files using FileIO, but currently all files fall
>> under same folder.
>>
>>
>>
>> I need to write to new folder every hour,
>>
>> eg.: /output/hour-01/files.*.-> events coming in at hour 1
>>
>>/output/hour-02/files.*.-> events coming in at hour 2
>>
>>
>>
>> My Code :
>>
>>
>>
>> parquetRecord.apply(*"Batch Events"*, Window.*into*(
>> FixedWindows.*of*(Duration.*standardMinutes*(10)))
>> .triggering(AfterWatermark.*pastEndOfWindow*())
>> .discardingFiredPanes()
>> .withAllowedLateness(Duration.*standardMinutes*(0)))
>> .apply(Distinct.*create*())
>>
>> .apply(FileIO.*write*()
>> .via(ParquetIO.*sink*(*getOutput_schema*()))
>> .to(outputPath + *"hour=" *+ *new *DateTime().toString(
>> *"HH"*) + *"/"*).withNumShards(1)
>> .withSuffix(*".snappy.parquet"*));
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Julius
>>
>>
>>
>>
>>
>>
>>
>


Re: FileIO write to new folder every hour.

2020-07-10 Thread Mohil Khare
Hello Julius,

Well I do something similar using FileIO.Write.FileNaming i,e,
https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html

You can do something like following:
.apply(FileIO.*write*()
.via(ParquetIO.*sink*(*getOutput_schema*()))
.to(outputPath)
.withNumShards(1)

   *.withNaming(new
DatePartitionedFileName(subpath))*

Where your DatePartitionedFileName will implete FileIO.Write.FileNaming and
override its getFileName method i.e. something like following:

class DatePartitionedFileName implements FileIO.Write.FileNaming {


DatePartitionedFileName(String subpath) {


}



  @Override

public String getFilename(BoundedWindow window, PaneInfo pane, int
numShards, int shardIndex, Compression compression) {

IntervalWindow intervalWindow = (IntervalWindow) window;

return String.format(

%s/file.parquet",

subpath,

some function like--> getCurrentHour());

}

}

Hope this helps.

Regards
Mohil


On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius 
wrote:

> Hi Team,
>
>
>
> I am trying to write files using FileIO, but currently all files fall
> under same folder.
>
>
>
> I need to write to new folder every hour,
>
> eg.: /output/hour-01/files.*.-> events coming in at hour 1
>
>/output/hour-02/files.*.-> events coming in at hour 2
>
>
>
> My Code :
>
>
>
> parquetRecord.apply(*"Batch Events"*, Window.*into*(
> FixedWindows.*of*(Duration.*standardMinutes*(10)))
> .triggering(AfterWatermark.*pastEndOfWindow*())
> .discardingFiredPanes()
> .withAllowedLateness(Duration.*standardMinutes*(0)))
> .apply(Distinct.*create*())
>
> .apply(FileIO.*write*()
> .via(ParquetIO.*sink*(*getOutput_schema*()))
> .to(outputPath + *"hour=" *+ *new *DateTime().toString(
> *"HH"*) + *"/"*).withNumShards(1)
> .withSuffix(*".snappy.parquet"*));
>
>
>
>
>
> Thanks,
>
> Julius
>
>
>
>
>
>
>


Carry forward state information from one window to next

2020-07-10 Thread Mohil Khare
Hello,

I am using beam on dataflow with java sdk 2.19.0.
I have a use case where I need to collect some messages in a short window
of few seconds to 1 minute, update state (stateful processing of beam) and
carry forward this state information to next window and use this to
initialize state of next window i.e something like following:

[image: image.png]

Since windows are very short, I don't want to write to some external DB on
window expiry and read from DB in next window. Is it possible to carry
forward state information from one window to next ? Or any other suggestion
to achieve this?

Thanks and regards
Mohil


Re: Unable to read value from state/Unable to fetch data due to token mismatch for key

2020-07-09 Thread Mohil Khare
Thanks Reuven for your reply.
Good to know that it is benign.

Regards
Mohil

On Wed, Jul 8, 2020 at 10:19 PM Reuven Lax  wrote:

> This error should be benign. It often means that ownership of the work
> item was moved to a different worker (possibly caused by autoscaling or
> other source of work rebalancing), so the in-progress work item on that
> worker failed. However the work item will be processed successfully on the
> new worker that owns it. This should not cause a persistent failure.
>
> On Wed, Jul 8, 2020 at 9:53 PM Mohil Khare  wrote:
>
>> Hello,
>>
>> I am using beam java sdk 2.19.0 (with enableStreamingEngine set as true)
>> and very heavily use stateful beam processing model.
>> However, sometimes I am seeing the following exception while reading
>> value from state for a key (Please note : here my key is a POJO where
>> fields create a kind of composite key. Also I am using AvroCoder for this
>> key):
>>
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.beam.runners.dataflow.worker.KeyTokenInvalidException: Unable to
>> fetch data due to token mismatch for key
>> 0ggadot_static@prosimo.ioHaa552bec-25f2-11ea-8705-267acc424a25H9219bdd5-335f-11ea-bd4f-de07a30b09ca
>> @ OC-AU sydney
>>
>>1.
>>   1. at
>>   
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
>>   AbstractFuture.getDoneValue (AbstractFuture.java:531
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-02_10_32_35-16490648164981155775=org%2Fapache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent%2FAbstractFuture.java=531=prosimo>
>>   )
>>   2. at
>>   
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
>>   AbstractFuture.get (AbstractFuture.java:492
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-02_10_32_35-16490648164981155775=org%2Fapache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent%2FAbstractFuture.java=492=prosimo>
>>   )
>>   3. at
>>   
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
>>   AbstractFuture$TrustedFuture.get (AbstractFuture.java:83
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-02_10_32_35-16490648164981155775=org%2Fapache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent%2FAbstractFuture.java=83=prosimo>
>>   )
>>   4. at
>>   
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
>>   ForwardingFuture.get (ForwardingFuture.java:62
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-02_10_32_35-16490648164981155775=org%2Fapache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent%2FForwardingFuture.java=62=prosimo>
>>   )
>>   5. atorg.apache.beam.runners.dataflow.worker.
>>   WindmillStateReader$WrappedFuture.get (WindmillStateReader.java:316
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-02_10_32_35-16490648164981155775=org%2Fapache.beam.runners.dataflow.worker%2FWindmillStateReader.java=316=prosimo>
>>   )
>>   6. atorg.apache.beam.runners.dataflow.worker.
>>   WindmillStateInternals$WindmillValue.read (
>>   WindmillStateInternals.java:385
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-02_10_32_35-16490648164981155775=org%2Fapache.beam.runners.dataflow.worker%2FWindmillStateInternals.java=385=prosimo>
>>   )
>>
>> Caused by: org.apache.beam.runners.dataflow.worker.KeyTokenInvalidException:
>> Unable to fetch data due to token mismatch for key 
>>
>>1.
>>   1. atorg.apache.beam.runners.dataflow.worker.
>>   WindmillStateReader.consumeResponse (WindmillStateReader.java:482
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-02_10_32_35-16490648164981155775=org%2Fapache.beam.runners.dataflow.worker%2FWindmillStateReader.java=482=prosimo>
>>   )
>>   2. atorg.apache.beam.runners.dataflow.worker.
>>   WindmillStateReader.startBatchAndBlock (
>>   WindmillStateReader.java:420
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-02_10_32_35-16490648164981155775=org%2Fapache.beam.runners.dataflow.worker%2FWindmillStateReader.java=420=prosimo>
>>   )
>>   3. atorg.apache.beam.runners.dataflow.worker.
>>   WindmillStateReader$WrappedFuture.get (WindmillStateReader.java:313
>>   
>> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs=2020-07-02_10_32_35-16490648164981155775=org%2Fapache.beam.runners.dataflow.worker%2FWindmillStateReader.java=313=prosimo>
>>   )
>>
>>
>> Any help to fix this issue would be greatly appreciated.
>>
>> Thanks and Regards
>> Mohil
>>
>


Unable to read value from state/Unable to fetch data due to token mismatch for key

2020-07-08 Thread Mohil Khare
Hello,

I am using beam java sdk 2.19.0 (with enableStreamingEngine set as true)
and very heavily use stateful beam processing model.
However, sometimes I am seeing the following exception while reading value
from state for a key (Please note : here my key is a POJO where fields
create a kind of composite key. Also I am using AvroCoder for this key):

Caused by: java.util.concurrent.ExecutionException:
org.apache.beam.runners.dataflow.worker.KeyTokenInvalidException: Unable to
fetch data due to token mismatch for key
0ggadot_static@prosimo.ioHaa552bec-25f2-11ea-8705-267acc424a25H9219bdd5-335f-11ea-bd4f-de07a30b09ca
@ OC-AU sydney

   1.
  1. at
  org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
  AbstractFuture.getDoneValue (AbstractFuture.java:531
  

  )
  2. at
  org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
  AbstractFuture.get (AbstractFuture.java:492
  

  )
  3. at
  org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
  AbstractFuture$TrustedFuture.get (AbstractFuture.java:83
  

  )
  4. at
  org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.
  ForwardingFuture.get (ForwardingFuture.java:62
  

  )
  5. atorg.apache.beam.runners.dataflow.worker.
  WindmillStateReader$WrappedFuture.get (WindmillStateReader.java:316
  

  )
  6. atorg.apache.beam.runners.dataflow.worker.
  WindmillStateInternals$WindmillValue.read (
  WindmillStateInternals.java:385
  

  )

Caused by: org.apache.beam.runners.dataflow.worker.KeyTokenInvalidException:
Unable to fetch data due to token mismatch for key 

   1.
  1. atorg.apache.beam.runners.dataflow.worker.
  WindmillStateReader.consumeResponse (WindmillStateReader.java:482
  

  )
  2. atorg.apache.beam.runners.dataflow.worker.
  WindmillStateReader.startBatchAndBlock (WindmillStateReader.java:420
  

  )
  3. atorg.apache.beam.runners.dataflow.worker.
  WindmillStateReader$WrappedFuture.get (WindmillStateReader.java:313
  

  )


Any help to fix this issue would be greatly appreciated.

Thanks and Regards
Mohil


Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-26 Thread Mohil Khare
I have opened following ticket:
https://issues.apache.org/jira/browse/BEAM-10339

Please let me know if there some other place where I need to open a support
ticket.

Thank you
Mohil



On Fri, Jun 26, 2020 at 11:13 AM Mohil Khare  wrote:

> Sure not a problem. I will open one.
>
> Thanks
> Mohil
>
> On Fri, Jun 26, 2020 at 10:55 AM Luke Cwik  wrote:
>
>> Sorry, I didn't open a support case. You should open the support case.
>>
>> On Fri, Jun 26, 2020 at 10:41 AM Mohil Khare  wrote:
>>
>>> Thanks a lot Luke for following up on this and opening a dataflow
>>> support.  It would be good to know how streamingEngine solved the problem.
>>> I will really appreciate it if you can share a link for a support case
>>> once you open it (if it is possible).
>>>
>>> Thanks and Regards
>>> Mohil
>>>
>>>
>>>
>>> On Fri, Jun 26, 2020 at 8:30 AM Luke Cwik  wrote:
>>>
>>>> It seems as though we have seen this failure before for Dataflow and it
>>>> was caused because the side input tags needed to be unique in a streaming
>>>> pipeline.
>>>>
>>>> It looked like this used to be a common occurrence in the Python SDK[1,
>>>> 2] because it generated tags that weren't unique enough.
>>>>
>>>> I would open up a case with Dataflow support with all the information
>>>> you have provided here.
>>>>
>>>> 1: https://issues.apache.org/jira/browse/BEAM-4549
>>>> 2: https://issues.apache.org/jira/browse/BEAM-4534
>>>>
>>>> On Thu, Jun 25, 2020 at 9:30 PM Mohil Khare  wrote:
>>>>
>>>>> Hi Luke and all,
>>>>>
>>>>> UPDATE: So when I started my job by *enabling the streaming engine
>>>>> and keeping the machine type default for the streaming engine
>>>>> (n1-standard-2)*, the pipeline started successfully.
>>>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>>>
>>>>> Still evaluating to make sure that there is no performance degradation
>>>>> by doing so.
>>>>>
>>>>> Thanks and regards
>>>>> Mohil
>>>>>
>>>>>
>>>>> On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare  wrote:
>>>>>
>>>>>> Hi Luke,
>>>>>>
>>>>>> Let me give you some more details about the code.
>>>>>> As I mentioned before, I am using java sdk 2.19.0 on dataflow.
>>>>>>
>>>>>> Default machine type which n1-standard-4.
>>>>>> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks
>>>>>> it up based on number of cores available)
>>>>>>
>>>>>> 1: Code listens for some trigger on pubsub topic:
>>>>>> /**
>>>>>>
>>>>>>  * Read From PubSub for topic ANALYTICS_UPDATE and create 
>>>>>> PCollection indicating main pipeline to reload * relevant 
>>>>>> DataAnalyticsData from BQ table */static class 
>>>>>> MonitorPubSubForDailyAnalyticsDataStatus extends PTransform>>>>> PCollection> {private final 
>>>>>> String subscriptionName;private final String jobProject;
>>>>>> MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String 
>>>>>> jobProject) {this.subscriptionName = subscriptionName;   
>>>>>>  this.jobProject = jobProject;}@Override
>>>>>> public PCollection expand(PBegin input) 
>>>>>> {return input.getPipeline()
>>>>>> .apply("Read_PubSub_Messages", 
>>>>>> PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))
>>>>>> .apply("Applying_Windowing", 
>>>>>> Window.into(new GlobalWindows())
>>>>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>>>>>> .discardingFiredPanes())
>>>>>> .apply("Read_Update_Status", ParDo.of(new DoFn>>>>> POJORepresentingJobCompleteInfo>() {@ProcessElement  
>>>>>>   public void processElement(@Element PubsubMessage 
>>>>>> input, OutputRec

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-26 Thread Mohil Khare
Sure not a problem. I will open one.

Thanks
Mohil

On Fri, Jun 26, 2020 at 10:55 AM Luke Cwik  wrote:

> Sorry, I didn't open a support case. You should open the support case.
>
> On Fri, Jun 26, 2020 at 10:41 AM Mohil Khare  wrote:
>
>> Thanks a lot Luke for following up on this and opening a dataflow
>> support.  It would be good to know how streamingEngine solved the problem.
>> I will really appreciate it if you can share a link for a support case
>> once you open it (if it is possible).
>>
>> Thanks and Regards
>> Mohil
>>
>>
>>
>> On Fri, Jun 26, 2020 at 8:30 AM Luke Cwik  wrote:
>>
>>> It seems as though we have seen this failure before for Dataflow and it
>>> was caused because the side input tags needed to be unique in a streaming
>>> pipeline.
>>>
>>> It looked like this used to be a common occurrence in the Python SDK[1,
>>> 2] because it generated tags that weren't unique enough.
>>>
>>> I would open up a case with Dataflow support with all the information
>>> you have provided here.
>>>
>>> 1: https://issues.apache.org/jira/browse/BEAM-4549
>>> 2: https://issues.apache.org/jira/browse/BEAM-4534
>>>
>>> On Thu, Jun 25, 2020 at 9:30 PM Mohil Khare  wrote:
>>>
>>>> Hi Luke and all,
>>>>
>>>> UPDATE: So when I started my job by *enabling the streaming engine and
>>>> keeping the machine type default for the streaming engine (n1-standard-2)*,
>>>> the pipeline started successfully.
>>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>>
>>>> Still evaluating to make sure that there is no performance degradation
>>>> by doing so.
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>>
>>>> On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare  wrote:
>>>>
>>>>> Hi Luke,
>>>>>
>>>>> Let me give you some more details about the code.
>>>>> As I mentioned before, I am using java sdk 2.19.0 on dataflow.
>>>>>
>>>>> Default machine type which n1-standard-4.
>>>>> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks
>>>>> it up based on number of cores available)
>>>>>
>>>>> 1: Code listens for some trigger on pubsub topic:
>>>>> /**
>>>>>
>>>>>  * Read From PubSub for topic ANALYTICS_UPDATE and create 
>>>>> PCollection indicating main pipeline to reload * relevant 
>>>>> DataAnalyticsData from BQ table */static class 
>>>>> MonitorPubSubForDailyAnalyticsDataStatus extends PTransform>>>> PCollection> {private final 
>>>>> String subscriptionName;private final String jobProject;
>>>>> MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String 
>>>>> jobProject) {this.subscriptionName = subscriptionName;
>>>>> this.jobProject = jobProject;}@Overridepublic 
>>>>> PCollection expand(PBegin input) {   
>>>>>  return input.getPipeline()
>>>>> .apply("Read_PubSub_Messages", 
>>>>> PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))
>>>>> .apply("Applying_Windowing", 
>>>>> Window.into(new GlobalWindows())
>>>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) 
>>>>>.discardingFiredPanes())
>>>>> .apply("Read_Update_Status", ParDo.of(new DoFn>>>> POJORepresentingJobCompleteInfo>() {@ProcessElement   
>>>>>  public void processElement(@Element PubsubMessage input, 
>>>>> OutputReceiver out) {
>>>>> /*** Read and CReate ***/
>>>>>
>>>>> out.output(POJORepresentingJobCompleteInfo);  
>>>>>  }}));}}
>>>>>
>>>>> 2: Get Latest Updated and Reload new Updates from various BQ tables using 
>>>>> google cloud bigquery library 
>>>>> (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)
>>>>>
&g

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-26 Thread Mohil Khare
Thanks a lot Luke for following up on this and opening a dataflow support.
It would be good to know how streamingEngine solved the problem.
I will really appreciate it if you can share a link for a support case once
you open it (if it is possible).

Thanks and Regards
Mohil



On Fri, Jun 26, 2020 at 8:30 AM Luke Cwik  wrote:

> It seems as though we have seen this failure before for Dataflow and it
> was caused because the side input tags needed to be unique in a streaming
> pipeline.
>
> It looked like this used to be a common occurrence in the Python SDK[1, 2]
> because it generated tags that weren't unique enough.
>
> I would open up a case with Dataflow support with all the information you
> have provided here.
>
> 1: https://issues.apache.org/jira/browse/BEAM-4549
> 2: https://issues.apache.org/jira/browse/BEAM-4534
>
> On Thu, Jun 25, 2020 at 9:30 PM Mohil Khare  wrote:
>
>> Hi Luke and all,
>>
>> UPDATE: So when I started my job by *enabling the streaming engine and
>> keeping the machine type default for the streaming engine (n1-standard-2)*,
>> the pipeline started successfully.
>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>
>> Still evaluating to make sure that there is no performance degradation by
>> doing so.
>>
>> Thanks and regards
>> Mohil
>>
>>
>> On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare  wrote:
>>
>>> Hi Luke,
>>>
>>> Let me give you some more details about the code.
>>> As I mentioned before, I am using java sdk 2.19.0 on dataflow.
>>>
>>> Default machine type which n1-standard-4.
>>> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks it
>>> up based on number of cores available)
>>>
>>> 1: Code listens for some trigger on pubsub topic:
>>> /**
>>>
>>>  * Read From PubSub for topic ANALYTICS_UPDATE and create 
>>> PCollection indicating main pipeline to reload * relevant 
>>> DataAnalyticsData from BQ table */static class 
>>> MonitorPubSubForDailyAnalyticsDataStatus extends PTransform>> PCollection> {private final String 
>>> subscriptionName;private final String jobProject;
>>> MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String 
>>> jobProject) {this.subscriptionName = subscriptionName;  
>>>   this.jobProject = jobProject;}@Overridepublic 
>>> PCollection expand(PBegin input) { 
>>>return input.getPipeline().apply("Read_PubSub_Messages", 
>>> PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))
>>> .apply("Applying_Windowing", Window.into(new 
>>> GlobalWindows())
>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))   
>>>  .discardingFiredPanes())
>>> .apply("Read_Update_Status", ParDo.of(new DoFn>> POJORepresentingJobCompleteInfo>() {@ProcessElement 
>>>public void processElement(@Element PubsubMessage input, 
>>> OutputReceiver out) {  
>>>   /*** Read and CReate ***/
>>>
>>> out.output(POJORepresentingJobCompleteInfo);
>>>}}));}}
>>>
>>> 2: Get Latest Updated and Reload new Updates from various BQ tables using 
>>> google cloud bigquery library 
>>> (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)
>>>
>>> PCollection analyticsDataStatusUpdates 
>>> = p.apply("Get_Analytics_Data_Status_Updates_pubsub",
>>>
>>> new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, 
>>> jobProject));
>>>
>>>
>>> 3. Create various PCollectionViews to be used as side input for decorating 
>>> stream of logs coming from Kafka (will be shown later)
>>>
>>>PCollectionView> Stats1View =
>>>
>>> analyticsDataStatusUpdates 
>>> .apply("Reload_Stats1_FromBQ", new ReadStats1()) 
>>> .apply("View_Stats1", View.asSingleton());
>>>
>>>
>>>PCollectionView> Stats2View =
>>>
>>> analyticsDataStatusUpdates 
>>> .apply("Reload_Stats1_FromBQ", ne

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-25 Thread Mohil Khare
Hi Luke and all,

UPDATE: So when I started my job by *enabling the streaming engine and
keeping the machine type default for the streaming engine (n1-standard-2)*,
the pipeline started successfully.
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine

Still evaluating to make sure that there is no performance degradation by
doing so.

Thanks and regards
Mohil


On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare  wrote:

> Hi Luke,
>
> Let me give you some more details about the code.
> As I mentioned before, I am using java sdk 2.19.0 on dataflow.
>
> Default machine type which n1-standard-4.
> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks it
> up based on number of cores available)
>
> 1: Code listens for some trigger on pubsub topic:
> /**
>
>  * Read From PubSub for topic ANALYTICS_UPDATE and create 
> PCollection indicating main pipeline to reload * relevant 
> DataAnalyticsData from BQ table */static class 
> MonitorPubSubForDailyAnalyticsDataStatus extends PTransform PCollection> {private final String 
> subscriptionName;private final String jobProject;
> MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String 
> jobProject) {this.subscriptionName = subscriptionName;
> this.jobProject = jobProject;}@Overridepublic 
> PCollection expand(PBegin input) {   
>  return input.getPipeline().apply("Read_PubSub_Messages", 
> PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))
> .apply("Applying_Windowing", Window.into(new 
> GlobalWindows())
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) 
>.discardingFiredPanes()).apply("Read_Update_Status", 
> ParDo.of(new DoFn() { 
>@ProcessElementpublic void 
> processElement(@Element PubsubMessage input, 
> OutputReceiver out) {
> /*** Read and CReate ***/
>
> out.output(POJORepresentingJobCompleteInfo);  
>  }}));}}
>
> 2: Get Latest Updated and Reload new Updates from various BQ tables using 
> google cloud bigquery library 
> (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)
>
> PCollection analyticsDataStatusUpdates = 
> p.apply("Get_Analytics_Data_Status_Updates_pubsub",
>
> new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, 
> jobProject));
>
>
> 3. Create various PCollectionViews to be used as side input for decorating 
> stream of logs coming from Kafka (will be shown later)
>
>PCollectionView> Stats1View =
>
> analyticsDataStatusUpdates 
> .apply("Reload_Stats1_FromBQ", new ReadStats1()) 
> .apply("View_Stats1", View.asSingleton());
>
>
>PCollectionView> Stats2View =
>
> analyticsDataStatusUpdates 
> .apply("Reload_Stats1_FromBQ", new ReadStats2()) 
> .apply("View_Stats1", View.asSingleton());
>
>.
>
>.
>
>.
>
>. and so one
>
>
> 4. An example of code where we read stats from BQ i.e. in ReadStats1(), 
> ReadStats2() and so on
>
>class ReadStatsS1 extends 
> PTransform, 
> PCollection>> {
>
>   @Overridepublic PCollection> 
> expand(PCollection input) {return 
> input.apply("Read_From_BigQuery", ParDo.of(new BigQueryRread()))  
>   .apply("Applying_Windowing", Window. Stats1>>into(new GlobalWindows())
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) 
>.discardingFiredPanes());}private class BigQueryRread extends 
> DoFn> {
> @ProcessElementpublic void process(@Element 
> POJORepresentingJobCompleteInfo input, ProcessContext c) {
> Map resultMap = new HashMap<>();   try 
> {BigQuery bigQueryClient = 
> BigQueryOptions.getDefaultInstance().getService();String 
> sqlQuery = getSqlQuery(input); ///some method to return desired sql query 
> based on info present in inputQueryJobConfiguration 
> queryJobConfiguration =
> QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();
> // Create a job ID so that we can safely retry.
> JobId jobId = JobId.of(UUID

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-25 Thread Mohil Khare
resultMap.put(key,
stats);}}
  }}} catch (Exception ex) {
 logger.p1Error("Error in executing sql query against Big
Query", ex);}logger.info("Emitting map of
size: {}", resultMap.size());c.output(resultMap);}
   }

As I mentioned before all classes : ReadStats1(), ReadStats2() etc
follow above code design


5. Using KafkaIO we read continuous stream of data from kafka

PCollection Logs =

p.apply("Read__Logs_From_Kafka", KafkaIO.read().withBootstrapServers(String.join(",",
bootstrapServerToConnectTo)).withTopic("topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaConsumerProperties)
.withConsumerFactoryFn(consumerFactoryObj)
.commitOffsetsInFinalize())
.apply("Applying_Fixed_Window_Logs", Window.>into(FixedWindows.of(Duration.standardSeconds(10)))

.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
   .withAllowedLateness(Duration.standardDays(1))
  .discardingFiredPanes())
.apply("Convert_KafkaRecord_To_PCollection",
ParDo.of(new ParseLogs());


6. Take these logs and apply another Transform providing
aforementioned BQ reads as side input i.e. something like this

Logs.apply("decorate", new Decorate().withSideInput(Stats1View,
Stats2View...);


Please Note: I tried commenting out code where I added side input to
the above transform and still landed up in the same crash. So Issue is
definitely in adding

more than a certain number of PCollectionView transforms. I already
had 3-4 such transforms and it was working fine. Yesterday I added a
few more and started seeing crashes.

If I enable just one of the newly added PCollectionView transforms
(keeping old 3-4 intact), then everything works fine. Moment I enable
another new transform, a crash happens.


Hope this provides some more insight. Let me know if I need to open a
ticket or I am doing something wrong or some extra configuration or
different worker machine type need to be provided.


Thanks and Regards

Mohil


On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare  wrote:

> Hi Luke,
>
> I can send you a code snippet with more details if it helps.
>
> BTW found similar issue here:
> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3ccaf9t7_74pkr7fj51-6_tbsycz9aiz_xsm7rcali5kmkd1ng...@mail.gmail.com%3E
>
> Thanks and Regards
> Mohil
>
> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare  wrote:
>
>> Hi Luke,
>> Thanks for your response, I tried looking at worker logs using the
>> logging service of GCP and unable to get a clear picture. Not sure if its
>> due to memory pressure or low number of harness threads.
>> Attaching a few more screenshots of crash logs that I found as wells json
>> dump of logs.
>>
>> Let me know if you still think opening a ticket is a right way to go.
>>
>> Thanks and regards
>> Mohil
>>
>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik  wrote:
>>
>>> Try looking at the worker logs to get a full stack trace. Take a look at
>>> this page for some debugging guidance[1] or consider opening a support case
>>> with GCP.
>>>
>>> 1:
>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>
>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare  wrote:
>>>
>>>> BTW, just to make sure that there is no issue with any individual
>>>> PTransform, I enabled each one of them one by one and the pipeline started
>>>> successfully. Issue happens as soon as I enable more than one new
>>>> aforementioned PTransform.
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare  wrote:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>>
>>>>> Need urgent help in debugging one issue.
>>>>>
>>>>> I recently added 3-4 new PTransformations. to an existing pipeline
>>>>> where I read data from BQ for a certain timestamp and create
>>>>> PCollectionView> to be used as side input in other
>>>>> PTransforms.
>>>>>
>>>>> i.e. something like this:
>>>>>
>>>>> /**
>>>>>  * Get PCollectionView Stats1
>>>>>  */
>>>>

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-25 Thread Mohil Khare
Hi Luke,

I can send you a code snippet with more details if it helps.

BTW found similar issue here:
http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3ccaf9t7_74pkr7fj51-6_tbsycz9aiz_xsm7rcali5kmkd1ng...@mail.gmail.com%3E

Thanks and Regards
Mohil

On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare  wrote:

> Hi Luke,
> Thanks for your response, I tried looking at worker logs using the logging
> service of GCP and unable to get a clear picture. Not sure if its due to
> memory pressure or low number of harness threads.
> Attaching a few more screenshots of crash logs that I found as wells json
> dump of logs.
>
> Let me know if you still think opening a ticket is a right way to go.
>
> Thanks and regards
> Mohil
>
> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik  wrote:
>
>> Try looking at the worker logs to get a full stack trace. Take a look at
>> this page for some debugging guidance[1] or consider opening a support case
>> with GCP.
>>
>> 1:
>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>
>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare  wrote:
>>
>>> BTW, just to make sure that there is no issue with any individual
>>> PTransform, I enabled each one of them one by one and the pipeline started
>>> successfully. Issue happens as soon as I enable more than one new
>>> aforementioned PTransform.
>>>
>>> Thanks and regards
>>> Mohil
>>>
>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare  wrote:
>>>
>>>> Hello All,
>>>>
>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>
>>>> Need urgent help in debugging one issue.
>>>>
>>>> I recently added 3-4 new PTransformations. to an existing pipeline
>>>> where I read data from BQ for a certain timestamp and create
>>>> PCollectionView> to be used as side input in other
>>>> PTransforms.
>>>>
>>>> i.e. something like this:
>>>>
>>>> /**
>>>>  * Get PCollectionView Stats1
>>>>  */
>>>> PCollectionView> stats1View =
>>>> jobCompleteStatus
>>>> .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>> .apply("View_S1STATS", View.asSingleton());
>>>>
>>>> /**
>>>>  * Get PCollectionView of Stats2
>>>>  */
>>>> PCollectionView> stats2View =
>>>> jobCompleteStatus
>>>> .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>> .apply("View_S2STATS", View.asSingleton());
>>>>
>>>>
>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a 
>>>> message
>>>>
>>>> received from PubSub that act as a trigger to reload these views.
>>>>
>>>> The moment I deployed the above pipeline, it didn't start and
>>>>
>>>> error reporting gave weird exceptions(see attached screenshot1 and 
>>>> screenshot) which I don't know how to debug.
>>>>
>>>>
>>>> Then as an experiment I made a change where I enabled only one new 
>>>> transformation
>>>>
>>>> and disabled others. This time I didn't see any issue.
>>>>
>>>> So it looks like some memory issue.
>>>>
>>>> I also compared worker logs between working case and non working case
>>>>
>>>> and it looks resources were not granted in non working case.
>>>>
>>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>>
>>>> I could't find any other log.
>>>>
>>>>
>>>> I would really appreciate quick help here.
>>>>
>>>>
>>>> Thanks and Regards
>>>>
>>>> Mohil
>>>>
>>>>
>>>>


Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-25 Thread Mohil Khare
BTW, just to make sure that there is no issue with any individual
PTransform, I enabled each one of them one by one and the pipeline started
successfully. Issue happens as soon as I enable more than one new
aforementioned PTransform.

Thanks and regards
Mohil

On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare  wrote:

> Hello All,
>
> I am using the BEAM java 2.19.0 version on google dataflow.
>
> Need urgent help in debugging one issue.
>
> I recently added 3-4 new PTransformations. to an existing pipeline where I
> read data from BQ for a certain timestamp and create
> PCollectionView> to be used as side input in other
> PTransforms.
>
> i.e. something like this:
>
> /**
>  * Get PCollectionView Stats1
>  */
> PCollectionView> stats1View =
> jobCompleteStatus
> .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
> .apply("View_S1STATS", View.asSingleton());
>
> /**
>  * Get PCollectionView of Stats2
>  */
> PCollectionView> stats2View =
> jobCompleteStatus
> .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
> .apply("View_S2STATS", View.asSingleton());
>
>
> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>
> received from PubSub that act as a trigger to reload these views.
>
> The moment I deployed the above pipeline, it didn't start and
>
> error reporting gave weird exceptions(see attached screenshot1 and 
> screenshot) which I don't know how to debug.
>
>
> Then as an experiment I made a change where I enabled only one new 
> transformation
>
> and disabled others. This time I didn't see any issue.
>
> So it looks like some memory issue.
>
> I also compared worker logs between working case and non working case
>
> and it looks resources were not granted in non working case.
>
> (See attached working-workerlogs and nonworking-workerlogs)
>
> I could't find any other log.
>
>
> I would really appreciate quick help here.
>
>
> Thanks and Regards
>
> Mohil
>
>
>


Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-06-03 Thread Mohil Khare
Cool ..Thanks again for you help/suggestions Luke.

Regards
Mohil

On Tue, Jun 2, 2020 at 10:42 AM Luke Cwik  wrote:

> Using side inputs is fine and is a common pattern. You should take a look
> at "slowly changing side inputs"[1] as there is some example code there.
>
> 1:
> https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing
>
> On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare  wrote:
>
>> Thanks Luke for your reply.
>> I see. I am trying to recall why I added allowedLateness as 360 days.
>> Anyways I will try without that.
>>
>> But do you think the approach I am using to keep getting a running score
>> in a sliding window and then using it as a side input to decorate the main
>> log  is correct ? Or I can achieve same thing is a much better and
>> optimized way.
>>
>> Thanks again
>> Mohil
>>
>> On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik  wrote:
>>
>>> Your allowed lateness is 360 days and since the trigger you have doesn't
>>> emit speculative results, you'll have to wait till the watermark advances
>>> to the end of windows timestamp + 360 days before something is output from
>>> the grouping aggregation/available at the side input.
>>>
>>>
>>> On Sat, May 30, 2020 at 12:17 PM Mohil Khare  wrote:
>>>
>>>> Hello all,
>>>>
>>>> Any suggestions? Where am I going wrong or is there any better way of
>>>> achieving this so that I can do replay as well ?
>>>>
>>>> Thanks
>>>> Mohil
>>>>
>>>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare  wrote:
>>>>
>>>>> Hi everyone,
>>>>> I need a suggestion regarding usage of the side input pattern and
>>>>> sliding window, especially while replaying old kafka logs/offsets.
>>>>>
>>>>> FYI: I am running beam 2.19 on google dataflow.
>>>>>
>>>>> I have a use case where I read a continuous stream of data from Kafka
>>>>> and need to calculate one score (apart from other calculations) per key
>>>>> which is based on the number of such requests that are received per key in
>>>>> the last one hour.
>>>>>
>>>>> Roughly my code looks like following:
>>>>>
>>>>> PCollection = p
>>>>> .apply("Read__Logs_From_Kafka", KafkaIO.read()
>>>>> .withBootstrapServers(String.join(",", 
>>>>> bootstrapServerToConnectTo))
>>>>> .withTopic("app_access_stats")
>>>>> .withKeyDeserializer(StringDeserializer.class)
>>>>> .withValueDeserializer(ByteArrayDeserializer.class)
>>>>> .withConsumerConfigUpdates(kafkaConsumerProperties)
>>>>> .withConsumerFactoryFn(consumerFactoryObj)
>>>>> .commitOffsetsInFinalize())
>>>>> .apply("Applying_Fixed_Window_Logs", Window.>>>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>> 
>>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
>>>>> .withAllowedLateness(Duration.standardDays(380))
>>>>> .discardingFiredPanes())
>>>>> .apply("Convert_KafkaRecord_To_PCollection",
>>>>> ParDo.of(new ParseKafkaLogs()));
>>>>>
>>>>>
>>>>> /*** Class that handles incoming PCollection and calculate score 
>>>>> ***/
>>>>>
>>>>> /**. Assumeinput = incoming PCollection as created above
>>>>>
>>>>> PCollectionView> slidingWindowHourlyUserRequestsPerKeyView
>>>>>
>>>>>= input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
>>>>> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>>>>
>>>>> /**Calculate Running sum of num of reqs in sliding window
>>>>>
>>>>> Starting sliding window of duration 1 hr every 1 sec so that we can 
>>>>> get accurate result of past 1 hr
>>>>>
>>>>> **/
>>>>>
>>>>>
>>>>> private static class WindowedNumUserRequestsPerKey extends 
>>>>> PTransform, PCollection>> {
>>>>>
>>>>> @Override
>>>>> publ

Re: KafkaIO write in case on topic name present in PCollection

2020-06-02 Thread Mohil Khare
Hey Alexey,

Thanks a lot for your quick response. This worked for me :). Awesome.

Regards
Mohil


On Tue, Jun 2, 2020 at 6:31 AM Alexey Romanenko 
wrote:

> Hi Mohil,
>
> In Java SDK you can use “KafkaIO.writeRecords()” for that. So, you will
> need to provide a PCollection> as an input collection
> where you set a desired output topic for every record inside ProducerRecord
> metadata.
> It could look something like this:
>
> PCollection>  teams = ...;
>
> PCollection> records = teams.apply(ParDo.
> of(new KV2ProducerRecord());
>
> private static class KV2ProducerRecord
> extends DoFn, ProducerRecord> {
> // create ProducerRecord and set your topic there
> ...
> }
>
> records.apply(KafkaIO.writeRecords()
> .withBootstrapServers("broker_1:9092,broker_2:9092")
> .withTopic("results”) // default sink topic
> .withKeySerializer(...)
> .withValueSerializer(...));
>
>
>
> On 2 Jun 2020, at 03:27, Mohil Khare  wrote:
>
> Hello everyone,
>
> Does anyone know if  it is possible to provide a topic name embedded in a
> PCollection object to kafkaIO while writing ?
>
> We have a use case where we have a team specific kafka topic for eg
> teamA_topicname, teamB_topicname.
>
> From beam, we create PCollection> and we need to send
> this data to kafka over aforementioned team specific topics.
> Is it possible to provide topic names dynamically to
> kafkaIO.write().withTopic() from Key  present in KV PCollection ?
>
> Thanks and regards
> Mohil
>
>
>
>
>
>


Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-06-01 Thread Mohil Khare
Thanks Luke for your reply.
I see. I am trying to recall why I added allowedLateness as 360 days.
Anyways I will try without that.

But do you think the approach I am using to keep getting a running score in
a sliding window and then using it as a side input to decorate the main
log  is correct ? Or I can achieve same thing is a much better and
optimized way.

Thanks again
Mohil

On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik  wrote:

> Your allowed lateness is 360 days and since the trigger you have doesn't
> emit speculative results, you'll have to wait till the watermark advances
> to the end of windows timestamp + 360 days before something is output from
> the grouping aggregation/available at the side input.
>
>
> On Sat, May 30, 2020 at 12:17 PM Mohil Khare  wrote:
>
>> Hello all,
>>
>> Any suggestions? Where am I going wrong or is there any better way of
>> achieving this so that I can do replay as well ?
>>
>> Thanks
>> Mohil
>>
>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare  wrote:
>>
>>> Hi everyone,
>>> I need a suggestion regarding usage of the side input pattern and
>>> sliding window, especially while replaying old kafka logs/offsets.
>>>
>>> FYI: I am running beam 2.19 on google dataflow.
>>>
>>> I have a use case where I read a continuous stream of data from Kafka
>>> and need to calculate one score (apart from other calculations) per key
>>> which is based on the number of such requests that are received per key in
>>> the last one hour.
>>>
>>> Roughly my code looks like following:
>>>
>>> PCollection = p
>>> .apply("Read__Logs_From_Kafka", KafkaIO.read()
>>> .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>>> .withTopic("app_access_stats")
>>> .withKeyDeserializer(StringDeserializer.class)
>>> .withValueDeserializer(ByteArrayDeserializer.class)
>>> .withConsumerConfigUpdates(kafkaConsumerProperties)
>>> .withConsumerFactoryFn(consumerFactoryObj)
>>> .commitOffsetsInFinalize())
>>> .apply("Applying_Fixed_Window_Logs", Window.>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>>> 
>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
>>> .withAllowedLateness(Duration.standardDays(380))
>>> .discardingFiredPanes())
>>> .apply("Convert_KafkaRecord_To_PCollection",
>>> ParDo.of(new ParseKafkaLogs()));
>>>
>>>
>>> /*** Class that handles incoming PCollection and calculate score ***/
>>>
>>> /**. Assumeinput = incoming PCollection as created above
>>>
>>> PCollectionView> slidingWindowHourlyUserRequestsPerKeyView
>>>
>>>= input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
>>> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>>
>>> /**Calculate Running sum of num of reqs in sliding window
>>>
>>> Starting sliding window of duration 1 hr every 1 sec so that we can get 
>>> accurate result of past 1 hr
>>>
>>> **/
>>>
>>>
>>> private static class WindowedNumUserRequestsPerKey extends 
>>> PTransform, PCollection>> {
>>>
>>> @Override
>>> public PCollection> expand(PCollection input) {
>>>
>>> return input
>>> .apply("Applying_Sliding_Window_1Hr_Every1sec", 
>>> Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>>> 
>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>>> .apply("Grouping_per_Key", ParDo.of(new 
>>> GroupByAggregationKey()))
>>> .apply("Total_Requests_Per_Key", Combine.perKey(new 
>>> CalculateTotalUserRequestsPerKey()));
>>> }
>>>
>>> private static class GroupByAggregationKey extends DoFn>> POJO>> {
>>> @ProcessElement
>>> public void processElement(@Element POJO input, 
>>> OutputReceiver> out) {
>>> /** code that emits required KV /
>>>
>>> }
>>> }
>>>
>>> private static class CalculateTotalUserRequestsPerKey ext

KafkaIO write in case on topic name present in PCollection

2020-06-01 Thread Mohil Khare
Hello everyone,

Does anyone know if  it is possible to provide a topic name embedded in a
PCollection object to kafkaIO while writing ?

We have a use case where we have a team specific kafka topic for eg
teamA_topicname, teamB_topicname.

>From beam, we create PCollection> and we need to send
this data to kafka over aforementioned team specific topics.
Is it possible to provide topic names dynamically to
kafkaIO.write().withTopic() from Key  present in KV PCollection ?

Thanks and regards
Mohil


Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-05-30 Thread Mohil Khare
Hello all,

Any suggestions? Where am I going wrong or is there any better way of
achieving this so that I can do replay as well ?

Thanks
Mohil

On Wed, May 27, 2020 at 11:40 AM Mohil Khare  wrote:

> Hi everyone,
> I need a suggestion regarding usage of the side input pattern and sliding
> window, especially while replaying old kafka logs/offsets.
>
> FYI: I am running beam 2.19 on google dataflow.
>
> I have a use case where I read a continuous stream of data from Kafka and
> need to calculate one score (apart from other calculations) per key  which
> is based on the number of such requests that are received per key in the
> last one hour.
>
> Roughly my code looks like following:
>
> PCollection = p
> .apply("Read__Logs_From_Kafka", KafkaIO.read()
> .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
> .withTopic("app_access_stats")
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(ByteArrayDeserializer.class)
> .withConsumerConfigUpdates(kafkaConsumerProperties)
> .withConsumerFactoryFn(consumerFactoryObj)
> .commitOffsetsInFinalize())
> .apply("Applying_Fixed_Window_Logs", Window. byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
> 
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
> .withAllowedLateness(Duration.standardDays(380))
> .discardingFiredPanes())
> .apply("Convert_KafkaRecord_To_PCollection",
> ParDo.of(new ParseKafkaLogs()));
>
>
> /*** Class that handles incoming PCollection and calculate score ***/
>
> /**. Assumeinput = incoming PCollection as created above
>
> PCollectionView> slidingWindowHourlyUserRequestsPerKeyView
>
>= input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>
> /**Calculate Running sum of num of reqs in sliding window
>
> Starting sliding window of duration 1 hr every 1 sec so that we can get 
> accurate result of past 1 hr
>
> **/
>
>
> private static class WindowedNumUserRequestsPerKey extends 
> PTransform, PCollection>> {
>
> @Override
> public PCollection> expand(PCollection input) {
>
> return input
> .apply("Applying_Sliding_Window_1Hr_Every1sec", 
> Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
> 
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
> .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
> .apply("Total_Requests_Per_Key", Combine.perKey(new 
> CalculateTotalUserRequestsPerKey()));
> }
>
> private static class GroupByAggregationKey extends DoFn POJO>> {
> @ProcessElement
> public void processElement(@Element POJO input, 
> OutputReceiver> out) {
> /** code that emits required KV /
>
> }
> }
>
> private static class CalculateTotalUserRequestsPerKey extends 
> Combine.CombineFn CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
> private static class TotalRequestsAccumulator implements Serializable 
> {
> private long num_requests_running_sum = 0;
>
> TotalRequestsAccumulator(long num_requests_running_sum) {
> this.num_requests_running_sum = num_requests_running_sum;
> }
>
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (!(o instanceof TotalRequestsAccumulator)) return false;
> TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
> return num_requests_running_sum == 
> that.num_requests_running_sum;
> }
>
> @Override
> public int hashCode() {
> return Objects.hash(num_requests_running_sum);
> }
> }
>
> @Override
> public TotalRequestsAccumulator createAccumulator() {
> return new TotalRequestsAccumulator(0);
> }
>
> @Override
> public TotalRequestsAccumulator addInput(TotalRequestsAccumulator 
> mutableAccumulator, POJO input) {
> mutableAccumulator.num_requests_running_sum++;
> return mutableAccumulator;
> }
>
> @Override
> public TotalRequestsAccumulator 

Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-05-27 Thread Mohil Khare
Hi everyone,
I need a suggestion regarding usage of the side input pattern and sliding
window, especially while replaying old kafka logs/offsets.

FYI: I am running beam 2.19 on google dataflow.

I have a use case where I read a continuous stream of data from Kafka and
need to calculate one score (apart from other calculations) per key  which
is based on the number of such requests that are received per key in the
last one hour.

Roughly my code looks like following:

PCollection = p
.apply("Read__Logs_From_Kafka", KafkaIO.read()
.withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
.withTopic("app_access_stats")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaConsumerProperties)
.withConsumerFactoryFn(consumerFactoryObj)
.commitOffsetsInFinalize())
.apply("Applying_Fixed_Window_Logs", Window.>into(FixedWindows.of(Duration.standardSeconds(10)))

.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
.withAllowedLateness(Duration.standardDays(380))
.discardingFiredPanes())
.apply("Convert_KafkaRecord_To_PCollection",
ParDo.of(new ParseKafkaLogs()));


/*** Class that handles incoming PCollection and calculate score ***/

/**. Assumeinput = incoming PCollection as created above

PCollectionView> slidingWindowHourlyUserRequestsPerKeyView

   = input.apply("Calculate_Total_UserRequests_Past_1Hr", new
WindowedNumUserRequestsPerKey()).apply(View.asMap());

/**Calculate Running sum of num of reqs in sliding window

Starting sliding window of duration 1 hr every 1 sec so that we
can get accurate result of past 1 hr

**/


private static class WindowedNumUserRequestsPerKey extends
PTransform, PCollection>> {

@Override
public PCollection> expand(PCollection input) {

return input
.apply("Applying_Sliding_Window_1Hr_Every1sec",
Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))

.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
.apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
.apply("Total_Requests_Per_Key", Combine.perKey(new
CalculateTotalUserRequestsPerKey()));
}

private static class GroupByAggregationKey extends DoFn> {
@ProcessElement
public void processElement(@Element POJO input,
OutputReceiver> out) {
/** code that emits required KV /

}
}

private static class CalculateTotalUserRequestsPerKey extends
Combine.CombineFn {
private static class TotalRequestsAccumulator implements Serializable {
private long num_requests_running_sum = 0;

TotalRequestsAccumulator(long num_requests_running_sum) {
this.num_requests_running_sum = num_requests_running_sum;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TotalRequestsAccumulator)) return false;
TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
return num_requests_running_sum ==
that.num_requests_running_sum;
}

@Override
public int hashCode() {
return Objects.hash(num_requests_running_sum);
}
}

@Override
public TotalRequestsAccumulator createAccumulator() {
return new TotalRequestsAccumulator(0);
}

@Override
public TotalRequestsAccumulator
addInput(TotalRequestsAccumulator mutableAccumulator, POJO input) {
mutableAccumulator.num_requests_running_sum++;
return mutableAccumulator;
}

@Override
public TotalRequestsAccumulator
mergeAccumulators(Iterable accumulators) {
TotalRequestsAccumulator merged = createAccumulator();
for (TotalRequestsAccumulator accumulator : accumulators) {
merged.num_requests_running_sum +=
accumulator.num_requests_running_sum;
}
return merged;
}

@Override
public Long extractOutput(TotalRequestsAccumulator accumulator) {
Long totalUserRequestsPerKey = accumulator.num_requests_running_sum;
return totalUserRequestsPerKey;
}
}
}

Now I calculate the score in the incoming POJO by using
slidingWindowHourlyUserRequestsPerKeyView as side input.

input.apply("Add_Score", ParDo.of(new AddScore())
.withSideInputs(slidingWindowHourlyUserRequestsPerKeyView));


Above seems to be working fine, though I need a suggestion if there is a
better way of achieving this?

Also, I start getting problems when we have to stop the beam for a couple
of hours 

Re: Timers exception on "Job Drain" while using stateful beam processing in global window

2020-05-20 Thread Mohil Khare
Hi Reuven, all,

I have opened following jira to track this issue:

https://issues.apache.org/jira/browse/BEAM-10053

Thanks and Regards
Mohil

On Mon, May 18, 2020 at 12:28 PM Mohil Khare  wrote:

> Hi Reuven,
>
> Thanks for your reply. Well, I haven't filed JIRA yet. But if it looks
> like a bug at beam's end, I will file jira for this.
>
> Regards
> Mohil
>
> On Mon, May 18, 2020 at 12:26 PM Reuven Lax  wrote:
>
>> However this still appears to be a bug - that exception should never be
>> thrown inside the Dataflow runner. Are you able to file a JIRA for this bug?
>>
>> On Mon, May 18, 2020 at 10:44 AM Robert Bradshaw 
>> wrote:
>>
>>> Glad you were able to get this working; thanks for following up.
>>>
>>> On Mon, May 18, 2020 at 10:35 AM Mohil Khare  wrote:
>>>
>>>> Hi,
>>>> On another note, I think I was unnecessarily complicating things by
>>>> applying a sliding window here and then an extra global window to remove
>>>> duplicates.  I replaced the *sliding window with a session window *(*as
>>>> I know that my transaction consisting of recordA logs and recordB logs for
>>>> a key "MyKey" won't last for more than 60-90 secs*), and my use case
>>>> seems to be working fine. Even DRAIN is working successfully.
>>>>
>>>> Thanks
>>>> Mohil
>>>>
>>>> On Sun, May 17, 2020 at 3:37 PM Mohil Khare  wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have a use case where I have two sets of PCollections (RecordA and
>>>>> RecordB) coming from a real time streaming source like Kafka.
>>>>>
>>>>> Both Records are correlated with a common key, let's say KEY.
>>>>>
>>>>> The purpose is to enrich RecordA with RecordB's data for which I am
>>>>> using CoGbByKey. Since RecordA and RecordB for a common key can come 
>>>>> within
>>>>> 1-2 minutes of event time, I am maintaining a sliding window for both
>>>>> records and then do CoGpByKey for both PCollections.
>>>>>
>>>>> The sliding windows that will find both RecordA and RecordB for a
>>>>> common key KEY, will emit enriched output. Now, since multiple sliding
>>>>> windows can emit the same output, I finally remove duplicate results by
>>>>> feeding aforementioned outputs to a global window where I maintain a state
>>>>> to check whether output has already been processed or not. Since it is a
>>>>> global window, I maintain a Timer on state (for GC) to let it expire after
>>>>> 10 minutes have elapsed since state has been written.
>>>>>
>>>>> This is working perfectly fine w.r.t the expected results. However, I
>>>>> am unable to stop job gracefully i.e. Drain the job gracefully. I see
>>>>> following exception:
>>>>>
>>>>> java.lang.IllegalStateException:
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6
>>>>> received state cleanup timer for window
>>>>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that
>>>>> is before the appropriate cleanup time 294248-01-24T04:00:54.776Z
>>>>>
>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.Strea

Re: Timers exception on "Job Drain" while using stateful beam processing in global window

2020-05-18 Thread Mohil Khare
Hi Reuven,

Thanks for your reply. Well, I haven't filed JIRA yet. But if it looks like
a bug at beam's end, I will file jira for this.

Regards
Mohil

On Mon, May 18, 2020 at 12:26 PM Reuven Lax  wrote:

> However this still appears to be a bug - that exception should never be
> thrown inside the Dataflow runner. Are you able to file a JIRA for this bug?
>
> On Mon, May 18, 2020 at 10:44 AM Robert Bradshaw 
> wrote:
>
>> Glad you were able to get this working; thanks for following up.
>>
>> On Mon, May 18, 2020 at 10:35 AM Mohil Khare  wrote:
>>
>>> Hi,
>>> On another note, I think I was unnecessarily complicating things by
>>> applying a sliding window here and then an extra global window to remove
>>> duplicates.  I replaced the *sliding window with a session window *(*as
>>> I know that my transaction consisting of recordA logs and recordB logs for
>>> a key "MyKey" won't last for more than 60-90 secs*), and my use case
>>> seems to be working fine. Even DRAIN is working successfully.
>>>
>>> Thanks
>>> Mohil
>>>
>>> On Sun, May 17, 2020 at 3:37 PM Mohil Khare  wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a use case where I have two sets of PCollections (RecordA and
>>>> RecordB) coming from a real time streaming source like Kafka.
>>>>
>>>> Both Records are correlated with a common key, let's say KEY.
>>>>
>>>> The purpose is to enrich RecordA with RecordB's data for which I am
>>>> using CoGbByKey. Since RecordA and RecordB for a common key can come within
>>>> 1-2 minutes of event time, I am maintaining a sliding window for both
>>>> records and then do CoGpByKey for both PCollections.
>>>>
>>>> The sliding windows that will find both RecordA and RecordB for a
>>>> common key KEY, will emit enriched output. Now, since multiple sliding
>>>> windows can emit the same output, I finally remove duplicate results by
>>>> feeding aforementioned outputs to a global window where I maintain a state
>>>> to check whether output has already been processed or not. Since it is a
>>>> global window, I maintain a Timer on state (for GC) to let it expire after
>>>> 10 minutes have elapsed since state has been written.
>>>>
>>>> This is working perfectly fine w.r.t the expected results. However, I
>>>> am unable to stop job gracefully i.e. Drain the job gracefully. I see
>>>> following exception:
>>>>
>>>> java.lang.IllegalStateException:
>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6
>>>> received state cleanup timer for window
>>>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
>>>> before the appropriate cleanup time 294248-01-24T04:00:54.776Z
>>>>
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> java.lang.Thread.run(Thread.java:745)
>>>> java.lang.IllegalStateException:
>>>> org.a

Re: Timers exception on "Job Drain" while using stateful beam processing in global window

2020-05-18 Thread Mohil Khare
Hi,
On another note, I think I was unnecessarily complicating things by
applying a sliding window here and then an extra global window to remove
duplicates.  I replaced the *sliding window with a session window *(*as I
know that my transaction consisting of recordA logs and recordB logs for a
key "MyKey" won't last for more than 60-90 secs*), and my use case seems to
be working fine. Even DRAIN is working successfully.

Thanks
Mohil

On Sun, May 17, 2020 at 3:37 PM Mohil Khare  wrote:

> Hello,
>
> I have a use case where I have two sets of PCollections (RecordA and
> RecordB) coming from a real time streaming source like Kafka.
>
> Both Records are correlated with a common key, let's say KEY.
>
> The purpose is to enrich RecordA with RecordB's data for which I am using
> CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2
> minutes of event time, I am maintaining a sliding window for both records
> and then do CoGpByKey for both PCollections.
>
> The sliding windows that will find both RecordA and RecordB for a common
> key KEY, will emit enriched output. Now, since multiple sliding windows can
> emit the same output, I finally remove duplicate results by feeding
> aforementioned outputs to a global window where I maintain a state to check
> whether output has already been processed or not. Since it is a global
> window, I maintain a Timer on state (for GC) to let it expire after 10
> minutes have elapsed since state has been written.
>
> This is working perfectly fine w.r.t the expected results. However, I am
> unable to stop job gracefully i.e. Drain the job gracefully. I see
> following exception:
>
> java.lang.IllegalStateException:
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received
> state cleanup timer for window
> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
> before the appropriate cleanup time 294248-01-24T04:00:54.776Z
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> java.lang.IllegalStateException:
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received
> state cleanup timer for window
> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
> before the appropriate cleanup time 294248-01-24T04:00:54.776Z
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflow

Timers exception on "Job Drain" while using stateful beam processing in global window

2020-05-17 Thread Mohil Khare
Hello,

I have a use case where I have two sets of PCollections (RecordA and
RecordB) coming from a real time streaming source like Kafka.

Both Records are correlated with a common key, let's say KEY.

The purpose is to enrich RecordA with RecordB's data for which I am using
CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2
minutes of event time, I am maintaining a sliding window for both records
and then do CoGpByKey for both PCollections.

The sliding windows that will find both RecordA and RecordB for a common
key KEY, will emit enriched output. Now, since multiple sliding windows can
emit the same output, I finally remove duplicate results by feeding
aforementioned outputs to a global window where I maintain a state to check
whether output has already been processed or not. Since it is a global
window, I maintain a Timer on state (for GC) to let it expire after 10
minutes have elapsed since state has been written.

This is working perfectly fine w.r.t the expected results. However, I am
unable to stop job gracefully i.e. Drain the job gracefully. I see
following exception:

java.lang.IllegalStateException:
org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received
state cleanup timer for window
org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
before the appropriate cleanup time 294248-01-24T04:00:54.776Z
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException:
org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received
state cleanup timer for window
org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
before the appropriate cleanup time 294248-01-24T04:00:54.776Z
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException:
org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b received
state cleanup timer for window
*org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210
that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z*
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)

Re: Using Self signed root ca for https connection in eleasticsearchIO

2020-05-17 Thread Mohil Khare
Hello Luke,

Thanks for your reply and I apologize for the late reply.
Well, I already tried using JvmInitializer and for some reason it didn't
work for me. Quite possible that I was not using it correctly. Do you have
any code snippets that show how we can use it in a Ptransformation.

My elasticsearch PTransform look like following:

class WriteToElasticSearch extends PTransform, PDone> {
//private static final Logger logger =
LoggerFactory.getLogger(WriteAppAccessLogToElasticSearch.class);
private final String[] elasticsearchEndPoint;
private final String username;
private final String password;

WriteToElasticSearch(String[] elasticsearchEndPoint, String
username, String password) {
this.elasticsearchEndPoint = elasticsearchEndPoint;
this.username = username;
this.password = password;

}
@Override
public PDone expand(PCollection input) {
input
.apply("Convert_PCollection to PCollection",
new MyPojoToString())
.apply("Global_Window_Trigger_Write_With_Every_Element",
Window.into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes()
)
.apply("Write_To_Elastic_Search",
ElasticsearchIO.write().withConnectionConfiguration(

ElasticsearchIO.ConnectionConfiguration.create(elasticsearchEndPoint,
"index_write", "_doc").withUsername(username).withPassword(password))
);
return PDone.in(input.getPipeline());
}


Thanks and Regards
Mohil

On Thu, Apr 9, 2020 at 2:02 PM Luke Cwik  wrote:

> You should be able to use a JvmInitializer[1] to set any system
> properties/configure the JVM trust store. Just make sure it's properly set
> up in your META-INF/services.
>
> This is supported by Dataflow and all PortableRunners that use a separate
> process/container for the worker.
>
> 1:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java
>
> On Thu, Apr 9, 2020 at 10:02 AM Mohil Khare  wrote:
>
>> Hi Kenneth,
>>
>> Thanks for your reply. You are right, I also believe that this has more
>> to do with Dataflow than Elasticsearch. I don't think the problem is in
>> classpath or beam unable to find file in classpath. The problem is how to.
>> set worker VM's keystore and truststore with self signed root ca. Usually
>> they contain all trusted root CAs like letsencrypt, symantec etc. KafkaIO
>> provide that via withConsumerFactorFn(loadKafkaConfig) where you can do
>> something like following:
>>
>> private void loadKafkaConfig(Map config) {
>>
>> readJKSFileFromGCS(this.gcsTruststorePath, 
>> "/tmp/kafka.client.truststore.jks");
>> readJKSFileFromGCS(this.gcsKeystorePath, 
>> "/tmp/kafka.client.keystore.jks");
>>
>>
>> config.put("ssl.truststore.location","/tmp/kafka.client.truststore.jks");
>> config.put("ssl.truststore.password","clientsecret");
>> config.put("ssl.keystore.location","/tmp/kafka.client.keystore.jks");
>> config.put("ssl.keystore.password","clientsecret");
>> config.put("ssl.key.password","clientsecret");
>> }
>>
>>
>> I was wondering if ElasticIO can also provide similar support where we
>> can provide our self signed root ca.
>>
>> Thanks and Regards
>> Mohil
>>
>>
>> On Tue, Apr 7, 2020 at 8:14 AM Kenneth Knowles  wrote:
>>
>>> Hi Mohil,
>>>
>>> Thanks for the detailed report. I think most people are reduced capacity
>>> right now. Filing a Jira would be helpful for tracking this.
>>>
>>> Since I am writing, I will add a quick guess, but we should move to
>>> Jira. It seems this has more to do with Dataflow than ElasticSearch. The
>>> default for staging files is to scan the classpath. To do more, or to fix
>>> any problem with the autodetection, you will need to specify --filesToStage
>>> on the command line or setFilesToStage in Java code. Am I correct that this
>>> symptom is confirmed?
>>>
>>> Kenn
>>>
>>> On Mon, Apr 6, 2020 at 5:04 PM Mohil Khare  wrote:
>>>
>>>> Any update on this? Shall I open a jira for this support ?
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>> On Sun, Mar 22, 2020 at 9:36 PM Mohil Khare  wrote:
>>>>
>>>>> Hi,
>>>>> This is Mohil from Prosimo, a small bay area based stealth mode
&

Re: Keeping keys in a state for a very very long time (keys expiry unknown)

2020-05-17 Thread Mohil Khare
Hi Reza and others,
As suggested, I have opened
https://issues.apache.org/jira/browse/BEAM-10019 which
I think might be a good addition to beam pipeline patterns.

Thanks
Mohil

On Mon, Apr 6, 2020 at 6:28 PM Mohil Khare  wrote:

> Sure thing.. I would love to contribute.
>
> Thanks
> Mohil
>
>
>
> On Mon, Apr 6, 2020 at 6:17 PM Reza Ardeshir Rokni 
> wrote:
>
>> Great! BTW if you get the time and wanted to contribute back to beam
>> there is a nice section to record cool patterns:
>>
>> https://beam.apache.org/documentation/patterns/overview/
>>
>> This would make a great one!
>>
>> On Tue, 7 Apr 2020 at 09:12, Mohil Khare  wrote:
>>
>>> No ... that's a valid answer. Since I wanted to have a long window size
>>> per key and since we can't use state with session windows, I am using a
>>> sliding window for let's say 72 hrs which starts every hour.
>>>
>>> Thanks a lot Reza for your input.
>>>
>>> Regards
>>> Mohil
>>>
>>> On Mon, Apr 6, 2020 at 6:09 PM Reza Ardeshir Rokni 
>>> wrote:
>>>
>>>> Depends on the use case, Global state comes with the technical debt of
>>>> having to do your own GC, but comes with more control. You could
>>>> implement the pattern above with a long FixedWindow as well, which will
>>>> take care of the GC within the window  bound.
>>>>
>>>> Sorry, its not a yes / no answer :-)
>>>>
>>>> On Tue, 7 Apr 2020 at 09:03, Mohil Khare  wrote:
>>>>
>>>>> Thanks a lot Reza for your quick response. Yeah saving the data in an
>>>>> external system after timer expiry makes sense.
>>>>> So do you suggest using a global window for maintaining state ?
>>>>>
>>>>> Thanks and regards
>>>>> Mohil
>>>>>
>>>>> On Mon, Apr 6, 2020 at 5:37 PM Reza Ardeshir Rokni 
>>>>> wrote:
>>>>>
>>>>>> Are you able to make use of the following pattern?
>>>>>>
>>>>>> Store StateA-metadata until no activity for Duration X, you can use a
>>>>>> Timer to check this, then expire the value, but store in an
>>>>>> external system. If you get a record that does want this value after
>>>>>> expiry, call out to the external system and store the value again in key
>>>>>> StateA-metadata.
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Tue, 7 Apr 2020 at 08:03, Mohil Khare  wrote:
>>>>>>
>>>>>>> Hello all,
>>>>>>> We are attempting a implement a use case where beam (java sdk) reads
>>>>>>> two kind of records from data stream like Kafka:
>>>>>>>
>>>>>>> 1. Records of type A containing key and corresponding metadata.
>>>>>>> 2. Records of type B containing the same key, but no metadata. Beam
>>>>>>> then needs to fill metadata for records of type B  by doing a lookup for
>>>>>>> metadata using keys received in records of type A.
>>>>>>>
>>>>>>> Idea is to save metadata or rather state for keys received in
>>>>>>> records of type A and then do a lookup when records of type B are 
>>>>>>> received.
>>>>>>> I have implemented this using the "@State" construct of beam.
>>>>>>> However my problem is that we don't know when keys should expire. I 
>>>>>>> don't
>>>>>>> think keeping a global window will be a good idea as there could be many
>>>>>>> keys (may be millions over a period of time) to be saved in a state.
>>>>>>>
>>>>>>> What is the best way to achieve this? I was reading about RedisIO,
>>>>>>> but found that it is still in the experimental stage. Can someone please
>>>>>>> recommend any solution to achieve this.
>>>>>>>
>>>>>>> Thanks and regards
>>>>>>> Mohil
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>


Re: Using Self signed root ca for https connection in eleasticsearchIO

2020-04-09 Thread Mohil Khare
Hi Kenneth,

Thanks for your reply. You are right, I also believe that this has more to
do with Dataflow than Elasticsearch. I don't think the problem is in
classpath or beam unable to find file in classpath. The problem is how to.
set worker VM's keystore and truststore with self signed root ca. Usually
they contain all trusted root CAs like letsencrypt, symantec etc. KafkaIO
provide that via withConsumerFactorFn(loadKafkaConfig) where you can do
something like following:

private void loadKafkaConfig(Map config) {

readJKSFileFromGCS(this.gcsTruststorePath,
"/tmp/kafka.client.truststore.jks");
readJKSFileFromGCS(this.gcsKeystorePath, "/tmp/kafka.client.keystore.jks");


config.put("ssl.truststore.location","/tmp/kafka.client.truststore.jks");
config.put("ssl.truststore.password","clientsecret");
config.put("ssl.keystore.location","/tmp/kafka.client.keystore.jks");
config.put("ssl.keystore.password","clientsecret");
config.put("ssl.key.password","clientsecret");
}


I was wondering if ElasticIO can also provide similar support where we can
provide our self signed root ca.

Thanks and Regards
Mohil


On Tue, Apr 7, 2020 at 8:14 AM Kenneth Knowles  wrote:

> Hi Mohil,
>
> Thanks for the detailed report. I think most people are reduced capacity
> right now. Filing a Jira would be helpful for tracking this.
>
> Since I am writing, I will add a quick guess, but we should move to Jira.
> It seems this has more to do with Dataflow than ElasticSearch. The default
> for staging files is to scan the classpath. To do more, or to fix any
> problem with the autodetection, you will need to specify --filesToStage on
> the command line or setFilesToStage in Java code. Am I correct that this
> symptom is confirmed?
>
> Kenn
>
> On Mon, Apr 6, 2020 at 5:04 PM Mohil Khare  wrote:
>
>> Any update on this? Shall I open a jira for this support ?
>>
>> Thanks and regards
>> Mohil
>>
>> On Sun, Mar 22, 2020 at 9:36 PM Mohil Khare  wrote:
>>
>>> Hi,
>>> This is Mohil from Prosimo, a small bay area based stealth mode startup.
>>> We use Beam (on version 2.19) with google dataflow in our analytics
>>> pipeline with Kafka and PubSub as source while GCS, BigQuery and
>>> ElasticSearch as our sink.
>>>
>>> We want to use our private self signed root ca for tls connections
>>> between our internal services viz kafka, ElasticSearch, beam etc. We are
>>> able to setup secure tls connection between beam and kafka using self
>>> signed root certificate in keystore.jks and truststore.jks and transferring
>>> it to worker VMs running kafkaIO using KafkaIO's read via
>>> withConsumerFactorFn().
>>>
>>> We want to do similar things with elasticseachIO where we want to update
>>> its worker VM's truststore with our self signed root certificate so that
>>> when elasticsearchIO connects using HTTPS, it can connect successfully
>>> without ssl handshake failure. Currently we couldn't find any way to do so
>>> with ElasticsearchIO. We tried various possible workarounds like:
>>>
>>> 1. Trying JvmInitializer to initialise Jvm with truststore using
>>> System.setproperty for javax.net.ssl.trustStore,
>>> 2. Transferring our jar to GCP's appengine where we start jar using
>>> Djavax.net.ssl.trustStore and then triggering beam job from there.
>>> 3. Setting elasticsearchIO flag withTrustSelfSigned to true (I don't
>>> think it will work because looking at the source code, it looks like it has
>>> dependency with keystorePath)
>>>
>>> But nothing worked. When we logged in to worker VMs, it looked like our
>>> trustStore never made it to worker VM. All elasticsearchIO connections
>>> failed with the following exception:
>>>
>>> sun.security.validator.ValidatorException: PKIX path building failed:
>>> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
>>> valid certification path to requested target
>>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
>>>
>>>
>>> Right now to unblock ourselves, we have added proxy with letsencrypt
>>> root ca between beam and Elasticsearch cluster and beam's elasticsearchIO
>>> connect successfully to proxy using letsencrypt root certificate. We won't
>>> want to use Letsencrypt root certificater for internal services as it
>>> expires every three months.  Is there a way, just like kafkaIO, to use
>>> selfsigned root certificate with elasticsearchIO? Or is there a way to
>>> update java cacerts on worker VMs where beam job is running?
>>>
>>> Looking forward for some suggestions soon.
>>>
>>> Thanks and Regards
>>> Mohil Khare
>>>
>>


Re: Keeping keys in a state for a very very long time (keys expiry unknown)

2020-04-06 Thread Mohil Khare
Sure thing.. I would love to contribute.

Thanks
Mohil



On Mon, Apr 6, 2020 at 6:17 PM Reza Ardeshir Rokni 
wrote:

> Great! BTW if you get the time and wanted to contribute back to beam there
> is a nice section to record cool patterns:
>
> https://beam.apache.org/documentation/patterns/overview/
>
> This would make a great one!
>
> On Tue, 7 Apr 2020 at 09:12, Mohil Khare  wrote:
>
>> No ... that's a valid answer. Since I wanted to have a long window size
>> per key and since we can't use state with session windows, I am using a
>> sliding window for let's say 72 hrs which starts every hour.
>>
>> Thanks a lot Reza for your input.
>>
>> Regards
>> Mohil
>>
>> On Mon, Apr 6, 2020 at 6:09 PM Reza Ardeshir Rokni 
>> wrote:
>>
>>> Depends on the use case, Global state comes with the technical debt of
>>> having to do your own GC, but comes with more control. You could
>>> implement the pattern above with a long FixedWindow as well, which will
>>> take care of the GC within the window  bound.
>>>
>>> Sorry, its not a yes / no answer :-)
>>>
>>> On Tue, 7 Apr 2020 at 09:03, Mohil Khare  wrote:
>>>
>>>> Thanks a lot Reza for your quick response. Yeah saving the data in an
>>>> external system after timer expiry makes sense.
>>>> So do you suggest using a global window for maintaining state ?
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>> On Mon, Apr 6, 2020 at 5:37 PM Reza Ardeshir Rokni 
>>>> wrote:
>>>>
>>>>> Are you able to make use of the following pattern?
>>>>>
>>>>> Store StateA-metadata until no activity for Duration X, you can use a
>>>>> Timer to check this, then expire the value, but store in an
>>>>> external system. If you get a record that does want this value after
>>>>> expiry, call out to the external system and store the value again in key
>>>>> StateA-metadata.
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Tue, 7 Apr 2020 at 08:03, Mohil Khare  wrote:
>>>>>
>>>>>> Hello all,
>>>>>> We are attempting a implement a use case where beam (java sdk) reads
>>>>>> two kind of records from data stream like Kafka:
>>>>>>
>>>>>> 1. Records of type A containing key and corresponding metadata.
>>>>>> 2. Records of type B containing the same key, but no metadata. Beam
>>>>>> then needs to fill metadata for records of type B  by doing a lookup for
>>>>>> metadata using keys received in records of type A.
>>>>>>
>>>>>> Idea is to save metadata or rather state for keys received in records
>>>>>> of type A and then do a lookup when records of type B are received.
>>>>>> I have implemented this using the "@State" construct of beam. However
>>>>>> my problem is that we don't know when keys should expire. I don't think
>>>>>> keeping a global window will be a good idea as there could be many keys
>>>>>> (may be millions over a period of time) to be saved in a state.
>>>>>>
>>>>>> What is the best way to achieve this? I was reading about RedisIO,
>>>>>> but found that it is still in the experimental stage. Can someone please
>>>>>> recommend any solution to achieve this.
>>>>>>
>>>>>> Thanks and regards
>>>>>> Mohil
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>


Re: Keeping keys in a state for a very very long time (keys expiry unknown)

2020-04-06 Thread Mohil Khare
No ... that's a valid answer. Since I wanted to have a long window size per
key and since we can't use state with session windows, I am using a sliding
window for let's say 72 hrs which starts every hour.

Thanks a lot Reza for your input.

Regards
Mohil

On Mon, Apr 6, 2020 at 6:09 PM Reza Ardeshir Rokni 
wrote:

> Depends on the use case, Global state comes with the technical debt of
> having to do your own GC, but comes with more control. You could
> implement the pattern above with a long FixedWindow as well, which will
> take care of the GC within the window  bound.
>
> Sorry, its not a yes / no answer :-)
>
> On Tue, 7 Apr 2020 at 09:03, Mohil Khare  wrote:
>
>> Thanks a lot Reza for your quick response. Yeah saving the data in an
>> external system after timer expiry makes sense.
>> So do you suggest using a global window for maintaining state ?
>>
>> Thanks and regards
>> Mohil
>>
>> On Mon, Apr 6, 2020 at 5:37 PM Reza Ardeshir Rokni 
>> wrote:
>>
>>> Are you able to make use of the following pattern?
>>>
>>> Store StateA-metadata until no activity for Duration X, you can use a
>>> Timer to check this, then expire the value, but store in an
>>> external system. If you get a record that does want this value after
>>> expiry, call out to the external system and store the value again in key
>>> StateA-metadata.
>>>
>>> Cheers
>>>
>>> On Tue, 7 Apr 2020 at 08:03, Mohil Khare  wrote:
>>>
>>>> Hello all,
>>>> We are attempting a implement a use case where beam (java sdk) reads
>>>> two kind of records from data stream like Kafka:
>>>>
>>>> 1. Records of type A containing key and corresponding metadata.
>>>> 2. Records of type B containing the same key, but no metadata. Beam
>>>> then needs to fill metadata for records of type B  by doing a lookup for
>>>> metadata using keys received in records of type A.
>>>>
>>>> Idea is to save metadata or rather state for keys received in records
>>>> of type A and then do a lookup when records of type B are received.
>>>> I have implemented this using the "@State" construct of beam. However
>>>> my problem is that we don't know when keys should expire. I don't think
>>>> keeping a global window will be a good idea as there could be many keys
>>>> (may be millions over a period of time) to be saved in a state.
>>>>
>>>> What is the best way to achieve this? I was reading about RedisIO, but
>>>> found that it is still in the experimental stage. Can someone please
>>>> recommend any solution to achieve this.
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>


Re: Keeping keys in a state for a very very long time (keys expiry unknown)

2020-04-06 Thread Mohil Khare
Thanks a lot Reza for your quick response. Yeah saving the data in an
external system after timer expiry makes sense.
So do you suggest using a global window for maintaining state ?

Thanks and regards
Mohil

On Mon, Apr 6, 2020 at 5:37 PM Reza Ardeshir Rokni 
wrote:

> Are you able to make use of the following pattern?
>
> Store StateA-metadata until no activity for Duration X, you can use a
> Timer to check this, then expire the value, but store in an
> external system. If you get a record that does want this value after
> expiry, call out to the external system and store the value again in key
> StateA-metadata.
>
> Cheers
>
> On Tue, 7 Apr 2020 at 08:03, Mohil Khare  wrote:
>
>> Hello all,
>> We are attempting a implement a use case where beam (java sdk) reads two
>> kind of records from data stream like Kafka:
>>
>> 1. Records of type A containing key and corresponding metadata.
>> 2. Records of type B containing the same key, but no metadata. Beam then
>> needs to fill metadata for records of type B  by doing a lookup for
>> metadata using keys received in records of type A.
>>
>> Idea is to save metadata or rather state for keys received in records of
>> type A and then do a lookup when records of type B are received.
>> I have implemented this using the "@State" construct of beam. However my
>> problem is that we don't know when keys should expire. I don't think
>> keeping a global window will be a good idea as there could be many keys
>> (may be millions over a period of time) to be saved in a state.
>>
>> What is the best way to achieve this? I was reading about RedisIO, but
>> found that it is still in the experimental stage. Can someone please
>> recommend any solution to achieve this.
>>
>> Thanks and regards
>> Mohil
>>
>>
>>
>>
>>
>>


Re: Using Self signed root ca for https connection in eleasticsearchIO

2020-04-06 Thread Mohil Khare
Any update on this? Shall I open a jira for this support ?

Thanks and regards
Mohil

On Sun, Mar 22, 2020 at 9:36 PM Mohil Khare  wrote:

> Hi,
> This is Mohil from Prosimo, a small bay area based stealth mode startup.
> We use Beam (on version 2.19) with google dataflow in our analytics
> pipeline with Kafka and PubSub as source while GCS, BigQuery and
> ElasticSearch as our sink.
>
> We want to use our private self signed root ca for tls connections between
> our internal services viz kafka, ElasticSearch, beam etc. We are able to
> setup secure tls connection between beam and kafka using self signed root
> certificate in keystore.jks and truststore.jks and transferring it to
> worker VMs running kafkaIO using KafkaIO's read via withConsumerFactorFn().
>
> We want to do similar things with elasticseachIO where we want to update
> its worker VM's truststore with our self signed root certificate so that
> when elasticsearchIO connects using HTTPS, it can connect successfully
> without ssl handshake failure. Currently we couldn't find any way to do so
> with ElasticsearchIO. We tried various possible workarounds like:
>
> 1. Trying JvmInitializer to initialise Jvm with truststore using
> System.setproperty for javax.net.ssl.trustStore,
> 2. Transferring our jar to GCP's appengine where we start jar using
> Djavax.net.ssl.trustStore and then triggering beam job from there.
> 3. Setting elasticsearchIO flag withTrustSelfSigned to true (I don't think
> it will work because looking at the source code, it looks like it has
> dependency with keystorePath)
>
> But nothing worked. When we logged in to worker VMs, it looked like our
> trustStore never made it to worker VM. All elasticsearchIO connections
> failed with the following exception:
>
> sun.security.validator.ValidatorException: PKIX path building failed:
> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
> valid certification path to requested target
> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
>
>
> Right now to unblock ourselves, we have added proxy with letsencrypt root
> ca between beam and Elasticsearch cluster and beam's elasticsearchIO
> connect successfully to proxy using letsencrypt root certificate. We won't
> want to use Letsencrypt root certificater for internal services as it
> expires every three months.  Is there a way, just like kafkaIO, to use
> selfsigned root certificate with elasticsearchIO? Or is there a way to
> update java cacerts on worker VMs where beam job is running?
>
> Looking forward for some suggestions soon.
>
> Thanks and Regards
> Mohil Khare
>


Keeping keys in a state for a very very long time (keys expiry unknown)

2020-04-06 Thread Mohil Khare
Hello all,
We are attempting a implement a use case where beam (java sdk) reads two
kind of records from data stream like Kafka:

1. Records of type A containing key and corresponding metadata.
2. Records of type B containing the same key, but no metadata. Beam then
needs to fill metadata for records of type B  by doing a lookup for
metadata using keys received in records of type A.

Idea is to save metadata or rather state for keys received in records of
type A and then do a lookup when records of type B are received.
I have implemented this using the "@State" construct of beam. However my
problem is that we don't know when keys should expire. I don't think
keeping a global window will be a good idea as there could be many keys
(may be millions over a period of time) to be saved in a state.

What is the best way to achieve this? I was reading about RedisIO, but
found that it is still in the experimental stage. Can someone please
recommend any solution to achieve this.

Thanks and regards
Mohil


Using Self signed root ca for https connection in eleasticsearchIO

2020-03-22 Thread Mohil Khare
Hi,
This is Mohil from Prosimo, a small bay area based stealth mode startup. We
use Beam (on version 2.19) with google dataflow in our analytics pipeline
with Kafka and PubSub as source while GCS, BigQuery and ElasticSearch as
our sink.

We want to use our private self signed root ca for tls connections between
our internal services viz kafka, ElasticSearch, beam etc. We are able to
setup secure tls connection between beam and kafka using self signed root
certificate in keystore.jks and truststore.jks and transferring it to
worker VMs running kafkaIO using KafkaIO's read via withConsumerFactorFn().

We want to do similar things with elasticseachIO where we want to update
its worker VM's truststore with our self signed root certificate so that
when elasticsearchIO connects using HTTPS, it can connect successfully
without ssl handshake failure. Currently we couldn't find any way to do so
with ElasticsearchIO. We tried various possible workarounds like:

1. Trying JvmInitializer to initialise Jvm with truststore using
System.setproperty for javax.net.ssl.trustStore,
2. Transferring our jar to GCP's appengine where we start jar using
Djavax.net.ssl.trustStore and then triggering beam job from there.
3. Setting elasticsearchIO flag withTrustSelfSigned to true (I don't think
it will work because looking at the source code, it looks like it has
dependency with keystorePath)

But nothing worked. When we logged in to worker VMs, it looked like our
trustStore never made it to worker VM. All elasticsearchIO connections
failed with the following exception:

sun.security.validator.ValidatorException: PKIX path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to find
valid certification path to requested target
sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)


Right now to unblock ourselves, we have added proxy with letsencrypt root
ca between beam and Elasticsearch cluster and beam's elasticsearchIO
connect successfully to proxy using letsencrypt root certificate. We won't
want to use Letsencrypt root certificater for internal services as it
expires every three months.  Is there a way, just like kafkaIO, to use
selfsigned root certificate with elasticsearchIO? Or is there a way to
update java cacerts on worker VMs where beam job is running?

Looking forward for some suggestions soon.

Thanks and Regards
Mohil Khare