Re: ElasticIO retry configuration exception

2018-10-10 Thread Romain Manni-Bucau
Hi Wout,

Maye check your classpath http client versions (against
https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
for instance).

Romain Manni-Bucau
@rmannibucau  |  Blog
 | Old Blog
 | Github  |
LinkedIn  | Book



Le mer. 10 oct. 2018 à 15:37, Wout Scheepers <
wout.scheep...@vente-exclusive.com> a écrit :

> Hey JB,
>
> Thanks for your fast reply.
> The elastic version we're using is 5.6.2.
>
> "version": {
> "number": "5.6.2",
> "build_hash": "57e20f3",
> "build_date": "2017-09-23T13:16:45.703Z",
> "build_snapshot": false,
> "lucene_version": "6.6.1"
> }
>
>
> Wout
>
>
>
> On 10/10/2018, 15:34, "Jean-Baptiste Onofré"  wrote:
>
> Hi Wout,
>
> what's the elasticsearch version ? (just to try to reproduce)
>
> Thanks,
> Regards
> JB
>
> On 10/10/2018 15:31, Wout Scheepers wrote:
> > Hey all,
> >
> >
> >
> > When using .withRetryConfiguration()for ElasticsearchIO, I get the
> > following stacktrace:
> >
> >
> >
> > Caused by:
> com.fasterxml.jackson.databind.exc.MismatchedInputException:
> > No content to map due to end-of-input
> >
> > at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1,
> > column: 0]
> >
> >at
> >
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> >
> >at
> >
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> >
> >at
> >
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
> >
> >at
> >
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
> >
> >at
> > org.apache.beam.sdk.io
> .elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:167)
> >
> >at
> > org.apache.beam.sdk.io
> .elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:171)
> >
> >at
> > org.apache.beam.sdk.io
> .elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1213)
> >
> >at
> > org.apache.beam.sdk.io
> .elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1183)
> >
> >
> >
> > I’ve been breaking my head on this one.
> >
> > Apparently the elastic Response object can’t be parsed anymore in the
> > checkForErrors() method.
> >
> > However, it is parsed successfully in the default RetryPredicate’s
> test
> > method, which is called in flushBatch() in the if clause related to
> the
> > retryConfig (ElasticsearchIO:1201).
> >
> > As far as I know, the Response object is not altered.
> >
> >
> >
> > Any clues why this doesn’t work for me?
> >
> > I really need this feature, as inserting 40M documents into elastic
> > results in too many retry timeouts ☺.
> >
> >
> >
> > Thanks!
> > Wout
> >
> >
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>


Re: How to use of BigQueryIO Method.FILE_LOADS when reading from a unbounded source?

2018-10-10 Thread Kaymak, Tobias
Hi Wout,

you are so right - I forgot the --tempLocation= parameter when launching
and after that I also needed to set the number of shards by adding:
.withNumFileShards(1)
Thank you!

Tobi

On Wed, Oct 10, 2018 at 3:23 PM Wout Scheepers <
wout.scheep...@vente-exclusive.com> wrote:

> Hey Tobias,
>
>
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
>
>
>
> points to the following code snippet (starting from BatchLoads.java:210) :
>
> if (bigQueryServices == null) {
>   try {
> GcsPath.fromUri(tempLocation);
>   } catch (IllegalArgumentException e) {
> throw new IllegalArgumentException(
> String.format(
> "BigQuery temp location expected a valid 'gs://' path, but was 
> given '%s'",
> tempLocation),
> e);
>   }
> }
>
>
>
> are you sure your templocation is set correctly? I guess it’s needed for 
> staging a bigquery load job instead of streaming.
>
>
>
> Wout
>
>
>
>
>
>
>
> *From: *"Kaymak, Tobias" 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Wednesday, 10 October 2018 at 14:18
> *To: *"user@beam.apache.org" 
> *Subject: *How to use of BigQueryIO Method.FILE_LOADS when reading from a
> unbounded source?
>
>
>
> I am trying to read from an unbounded source and using FILE_LOADS instead
> of streaming inserts towards BigQuery.
>
> If I don't have the following two lines
>
>
>
>  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>
>   .withTriggeringFrequency(Duration.standardMinutes(10))
>
>
>
> my code works just fine, but uses streaming inserts. If I add them I get a
> non-specific stacktrace like:
>
>
>
> Exception in thread "main" java.lang.IllegalArgumentException
>
> at
> com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78)
>
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070)
>
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>
> at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181)
>
>
>
> where line 181 is the first line of the following code excerpt:
>
>
>
> BigQueryIO.write()
>
>   .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>
>   .withTriggeringFrequency(Duration.standardMinutes(10))
>
>
>
>   .to(
>
>   new DynamicDestinations() {
>
> @Override
>
> public String getDestination(ValueInSingleWindow element) {
>
>   return element.getValue().getTopic();
>
> }
>
>
>
> @Override
>
> public TableDestination getTable(String destination) {
>
>   return new TableDestination(
>
>   "charged-dialect-824:KafkaStaging.test", null, new
> TimePartitioning().setType("DAY"));
>
> }
>
>
>
> @Override
>
> public TableSchema getSchema(String destination) {
>
>   return inputMessagesConfig.getTableSchema(destination);
>
> }
>
>   })
>
>   .withFormatFunction(
>
>   (SerializableFunction)
>
>   event -> convertUserEventToTableRow(event))
>
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>
>   .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>
>
>
> I am not sure what I am doing wrong here, I tried higher values for the
> Duration, but that didn't help. I wasn't able to find the root
>
> cause for the exception with the debugger, any idea how I can get to the
> bottom of this?
>
> Tobi
>
>
>


-- 
Tobias Kaymak
Data Engineer

tobias.kay...@ricardo.ch
www.ricardo.ch


Re: ElasticIO retry configuration exception

2018-10-10 Thread Wout Scheepers
Hey JB,

Thanks for your fast reply.
The elastic version we're using is 5.6.2.

"version": {
"number": "5.6.2",
"build_hash": "57e20f3",
"build_date": "2017-09-23T13:16:45.703Z",
"build_snapshot": false,
"lucene_version": "6.6.1"
}


Wout



On 10/10/2018, 15:34, "Jean-Baptiste Onofré"  wrote:

Hi Wout,

what's the elasticsearch version ? (just to try to reproduce)

Thanks,
Regards
JB

On 10/10/2018 15:31, Wout Scheepers wrote:
> Hey all,
> 
>  
> 
> When using .withRetryConfiguration()for ElasticsearchIO, I get the
> following stacktrace:
> 
>  
> 
> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
> 
> at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1,
> column: 0]
> 
>at
> 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> 
>at
> 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> 
>at
> 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
> 
>at
> 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
> 
>at
> 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:167)
> 
>at
> 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:171)
> 
>at
> 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1213)
> 
>at
> 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1183)
> 
>  
> 
> I’ve been breaking my head on this one.
> 
> Apparently the elastic Response object can’t be parsed anymore in the
> checkForErrors() method.
> 
> However, it is parsed successfully in the default RetryPredicate’s test
> method, which is called in flushBatch() in the if clause related to the
> retryConfig (ElasticsearchIO:1201).
> 
> As far as I know, the Response object is not altered.
> 
>  
> 
> Any clues why this doesn’t work for me?
> 
> I really need this feature, as inserting 40M documents into elastic
> results in too many retry timeouts ☺.
> 
>  
> 
> Thanks!
> Wout
> 
>  
> 
>  
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com




Re: ElasticIO retry configuration exception

2018-10-10 Thread Jean-Baptiste Onofré
Hi Wout,

what's the elasticsearch version ? (just to try to reproduce)

Thanks,
Regards
JB

On 10/10/2018 15:31, Wout Scheepers wrote:
> Hey all,
> 
>  
> 
> When using .withRetryConfiguration()for ElasticsearchIO, I get the
> following stacktrace:
> 
>  
> 
> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
> 
> at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1,
> column: 0]
> 
>    at
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> 
>    at
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> 
>    at
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
> 
>    at
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
> 
>    at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:167)
> 
>    at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:171)
> 
>    at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1213)
> 
>    at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1183)
> 
>  
> 
> I’ve been breaking my head on this one.
> 
> Apparently the elastic Response object can’t be parsed anymore in the
> checkForErrors() method.
> 
> However, it is parsed successfully in the default RetryPredicate’s test
> method, which is called in flushBatch() in the if clause related to the
> retryConfig (ElasticsearchIO:1201).
> 
> As far as I know, the Response object is not altered.
> 
>  
> 
> Any clues why this doesn’t work for me?
> 
> I really need this feature, as inserting 40M documents into elastic
> results in too many retry timeouts ☺.
> 
>  
> 
> Thanks!
> Wout
> 
>  
> 
>  
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


ElasticIO retry configuration exception

2018-10-10 Thread Wout Scheepers
Hey all,

When using .withRetryConfiguration()for ElasticsearchIO, I get the following 
stacktrace:

Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No 
content to map due to end-of-input
at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1, column: 0]
   at 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
   at 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
   at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
   at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
   at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:167)
   at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:171)
   at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1213)
   at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1183)

I’ve been breaking my head on this one.
Apparently the elastic Response object can’t be parsed anymore in the 
checkForErrors() method.
However, it is parsed successfully in the default RetryPredicate’s test method, 
which is called in flushBatch() in the if clause related to the retryConfig 
(ElasticsearchIO:1201).
As far as I know, the Response object is not altered.

Any clues why this doesn’t work for me?
I really need this feature, as inserting 40M documents into elastic results in 
too many retry timeouts ☺.

Thanks!
Wout




Re: How to use of BigQueryIO Method.FILE_LOADS when reading from a unbounded source?

2018-10-10 Thread Wout Scheepers
Hey Tobias,

org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)



points to the following code snippet (starting from BatchLoads.java:210) :

if (bigQueryServices == null) {
  try {
GcsPath.fromUri(tempLocation);
  } catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
"BigQuery temp location expected a valid 'gs://' path, but was 
given '%s'",
tempLocation),
e);
  }
}



are you sure your templocation is set correctly? I guess it’s needed for 
staging a bigquery load job instead of streaming.



Wout



From: "Kaymak, Tobias" 
Reply-To: "user@beam.apache.org" 
Date: Wednesday, 10 October 2018 at 14:18
To: "user@beam.apache.org" 
Subject: How to use of BigQueryIO Method.FILE_LOADS when reading from a 
unbounded source?

I am trying to read from an unbounded source and using FILE_LOADS instead of 
streaming inserts towards BigQuery.

If I don't have the following two lines

 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
  .withTriggeringFrequency(Duration.standardMinutes(10))

my code works just fine, but uses streaming inserts. If I add them I get a 
non-specific stacktrace like:

Exception in thread "main" java.lang.IllegalArgumentException
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
at 
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694)
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638)
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181)

where line 181 is the first line of the following code excerpt:

BigQueryIO.write()
  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
  .withTriggeringFrequency(Duration.standardMinutes(10))

  .to(
  new DynamicDestinations() {
@Override
public String getDestination(ValueInSingleWindow element) {
  return element.getValue().getTopic();
}

@Override
public TableDestination getTable(String destination) {
  return new TableDestination(
  "charged-dialect-824:KafkaStaging.test", null, new 
TimePartitioning().setType("DAY"));
}

@Override
public TableSchema getSchema(String destination) {
  return inputMessagesConfig.getTableSchema(destination);
}
  })
  .withFormatFunction(
  (SerializableFunction)
  event -> convertUserEventToTableRow(event))
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

I am not sure what I am doing wrong here, I tried higher values for the 
Duration, but that didn't help. I wasn't able to find the root
cause for the exception with the debugger, any idea how I can get to the bottom 
of this?

Tobi



How to use of BigQueryIO Method.FILE_LOADS when reading from a unbounded source?

2018-10-10 Thread Kaymak, Tobias
I am trying to read from an unbounded source and using FILE_LOADS instead
of streaming inserts towards BigQuery.

If I don't have the following two lines

 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
  .withTriggeringFrequency(Duration.standardMinutes(10))

my code works just fine, but uses streaming inserts. If I add them I get a
non-specific stacktrace like:

Exception in thread "main" java.lang.IllegalArgumentException
at
com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
at
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
at
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181)

where line 181 is the first line of the following code excerpt:

BigQueryIO.write()
  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
  .withTriggeringFrequency(Duration.standardMinutes(10))

  .to(
  new DynamicDestinations() {
@Override
public String getDestination(ValueInSingleWindow element) {
  return element.getValue().getTopic();
}

@Override
public TableDestination getTable(String destination) {
  return new TableDestination(
  "charged-dialect-824:KafkaStaging.test", null, new
TimePartitioning().setType("DAY"));
}

@Override
public TableSchema getSchema(String destination) {
  return inputMessagesConfig.getTableSchema(destination);
}
  })
  .withFormatFunction(
  (SerializableFunction)
  event -> convertUserEventToTableRow(event))

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

I am not sure what I am doing wrong here, I tried higher values for the
Duration, but that didn't help. I wasn't able to find the root
cause for the exception with the debugger, any idea how I can get to the
bottom of this?

Tobi


Re: Issue with GroupByKey in BeamSql using SparkRunner

2018-10-10 Thread Jean-Baptiste Onofré
It's maybe related: I have a pipeline (streaming with sliding windows)
that works fine with Direct and Flink runners, but I don't have any
result when using the Spark runner.

I gonna investigate this using my beam-samples.

Regards
JB

On 10/10/2018 11:16, Ismaël Mejía wrote:
> Are you trying this in a particular spark distribution or just locally ?
> I ask this because there was a data corruption issue with Spark 2.3.1
> (previous version used by Beam)
> https://issues.apache.org/jira/browse/SPARK-23243
> 
> Current Beam master (and next release) moves Spark to version 2.3.2
> and that should fix some of the data correctness issues (maybe yours
> too).
> Can you give it a try and report back if this fixes your issue.
> 
> 
> On Tue, Oct 9, 2018 at 6:45 PM Vishwas Bm  wrote:
>>
>> Hi Kenn,
>>
>> We are using Beam 2.6 and using Spark_submit to submit jobs to Spark 2.2 
>> cluster on Kubernetes.
>>
>>
>> On Tue, Oct 9, 2018, 9:29 PM Kenneth Knowles  wrote:
>>>
>>> Thanks for the report! I filed 
>>> https://issues.apache.org/jira/browse/BEAM-5690 to track the issue.
>>>
>>> Can you share what version of Beam you are using?
>>>
>>> Kenn
>>>
>>> On Tue, Oct 9, 2018 at 3:18 AM Vishwas Bm  wrote:

 We are trying to setup a pipeline with using BeamSql and the trigger used 
 is default (AfterWatermark crosses the window).
 Below is the pipeline:

KafkaSource (KafkaIO) ---> Windowing (FixedWindow 1min) ---> BeamSql 
 ---> KafkaSink (KafkaIO)

 We are using Spark Runner for this.
 The BeamSql query is:
  select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY 
 Col3

 We are grouping by Col3 which is a string. It can hold values string[0-9].

 The records are getting emitted out at 1 min to kafka sink, but the output 
 record in kafka is not as expected.
 Below is the output observed: (WST and WET are indicators for window start 
 time and window end time)

 {"count_col1":1,"Col3":"string5","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}
 {"count_col1":3,"Col3":"string7","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}
 {"count_col1":2,"Col3":"string8","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}
 {"count_col1":1,"Col3":"string2","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}
 {"count_col1":1,"Col3":"string6","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}
 {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}
 {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}
 {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}
 {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}
 {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}
 {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}
 {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}
 {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
 +","WET":"2018-10-09  09-56-00   +"}

 We ran the same pipeline using direct and flink runner and we dont see 0 
 entries for count_col1.

 As per beam matrix page 
 (https://beam.apache.org/documentation/runners/capability-matrix/#cap-summary-what),
  GroupBy is not fully supported,is this one of those cases ?
 Thanks & Regards,
 Vishwas


-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Issue with GroupByKey in BeamSql using SparkRunner

2018-10-10 Thread Ismaël Mejía
Are you trying this in a particular spark distribution or just locally ?
I ask this because there was a data corruption issue with Spark 2.3.1
(previous version used by Beam)
https://issues.apache.org/jira/browse/SPARK-23243

Current Beam master (and next release) moves Spark to version 2.3.2
and that should fix some of the data correctness issues (maybe yours
too).
Can you give it a try and report back if this fixes your issue.


On Tue, Oct 9, 2018 at 6:45 PM Vishwas Bm  wrote:
>
> Hi Kenn,
>
> We are using Beam 2.6 and using Spark_submit to submit jobs to Spark 2.2 
> cluster on Kubernetes.
>
>
> On Tue, Oct 9, 2018, 9:29 PM Kenneth Knowles  wrote:
>>
>> Thanks for the report! I filed 
>> https://issues.apache.org/jira/browse/BEAM-5690 to track the issue.
>>
>> Can you share what version of Beam you are using?
>>
>> Kenn
>>
>> On Tue, Oct 9, 2018 at 3:18 AM Vishwas Bm  wrote:
>>>
>>> We are trying to setup a pipeline with using BeamSql and the trigger used 
>>> is default (AfterWatermark crosses the window).
>>> Below is the pipeline:
>>>
>>>KafkaSource (KafkaIO) ---> Windowing (FixedWindow 1min) ---> BeamSql 
>>> ---> KafkaSink (KafkaIO)
>>>
>>> We are using Spark Runner for this.
>>> The BeamSql query is:
>>>  select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY 
>>> Col3
>>>
>>> We are grouping by Col3 which is a string. It can hold values string[0-9].
>>>
>>> The records are getting emitted out at 1 min to kafka sink, but the output 
>>> record in kafka is not as expected.
>>> Below is the output observed: (WST and WET are indicators for window start 
>>> time and window end time)
>>>
>>> {"count_col1":1,"Col3":"string5","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>> {"count_col1":3,"Col3":"string7","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>> {"count_col1":2,"Col3":"string8","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>> {"count_col1":1,"Col3":"string2","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>> {"count_col1":1,"Col3":"string6","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00   
>>> +","WET":"2018-10-09  09-56-00   +"}
>>>
>>> We ran the same pipeline using direct and flink runner and we dont see 0 
>>> entries for count_col1.
>>>
>>> As per beam matrix page 
>>> (https://beam.apache.org/documentation/runners/capability-matrix/#cap-summary-what),
>>>  GroupBy is not fully supported,is this one of those cases ?
>>> Thanks & Regards,
>>> Vishwas
>>>