[jira] [Commented] (BEAM-4750) Beam performance degraded significantly since 2.4

2018-10-04 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16637867#comment-16637867
 ] 

Jozef Vilcek commented on BEAM-4750:


Last comment from [~jbonofre] is saying it is being evaluated. Is it being 
worked on or not right now?

I am only looking for the status on what can be expected in near future.

> Beam performance degraded significantly since 2.4
> -
>
> Key: BEAM-4750
> URL: https://issues.apache.org/jira/browse/BEAM-4750
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.4.0, 2.5.0
>Reporter: Vojtech Janota
>Assignee: Jean-Baptiste Onofré
>Priority: Major
> Fix For: 2.9.0
>
>
> Starting from Beam 2.4 onwards the *InMemoryStateInternals* class in the 
> *beam-runners-core-java* module does an expensive Coder encode/decode 
> operation when copying object state. This has significant impact on 
> performance and pipelines that previously took low minutes do not finish 
> within hours in our case. Based on the discussion on the dev mailing list, 
> the main motivation for this change was to enforce Coder sanity, something 
> that should arguably remain within the realm of the DirectRunner and should 
> not leak into the core layer.
> Links to commits that introduced the new behaviour:
>  * [https://github.com/apache/beam/commit/32a427c]
>  * [https://github.com/apache/beam/commit/8151d82]
>  
> Additional details and surrounding discussion can be found here:
>  * 
> https://lists.apache.org/thread.html/1e329318a4dafe27b9ff304d9460d05d8966e5ceebaf4ebfb948e2b8@%3Cdev.beam.apache.org%3E
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5265) Can not test Timer with processing time domain

2018-09-17 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617696#comment-16617696
 ] 

Jozef Vilcek commented on BEAM-5265:


Any update on this or when there can be one?

> Can not test Timer with processing time domain
> --
>
> Key: BEAM-5265
> URL: https://issues.apache.org/jira/browse/BEAM-5265
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core, runner-direct
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
> writing tests, I noticed that it does not react to `advanceProcessingTime()` 
> on tests stream. Problem seems to be here:
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]
> I can only tell that patching this place works for direct runner tests. Not 
> sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-5246) Beam metrics exported as flink metrics are not correct

2018-08-31 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599032#comment-16599032
 ] 

Jozef Vilcek edited comment on BEAM-5246 at 8/31/18 5:15 PM:
-

I was looking around a bit more ... Does the attached PR make sense? To forward 
metrics from accumulator to metricGroup only for actual stepName.


was (Author: jozovilcek):
I was looking around a bit more ... Does the attached PR make sense, to forward 
metrics from accumulator to metricGroup only for actual stepName?

> Beam metrics exported as flink metrics are not correct
> --
>
> Key: BEAM-5246
> URL: https://issues.apache.org/jira/browse/BEAM-5246
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In Flink UI and fink native MetricReported, I am seeing too many instances of 
> my Beam metric counter. It looks like the counter is materialised for every 
> operator running within the task, although is is emitter from only one beam 
> step (which should map to one operator?). This produces double counting.
> A bit debugging I noticed this is happening for stream jobs. In batch I was 
> not able to reproduce it. Problem might be in FlinkMetricContainer.
> [https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java#L86]
> The update seems to be called from operators after finishing the bundle. Data 
> from accumulator are flushed to `runtimeContext.getMetricGroup()`. The scope 
> of accumulator seems to be different than metricGroup as in there with 
> different call the scope components change, especially for operatorID. It 
> seems like during the run, `metricResult.getStep()` does not match 
> operatorName of metricGroup where metric is being pushed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5246) Beam metrics exported as flink metrics are not correct

2018-08-31 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599032#comment-16599032
 ] 

Jozef Vilcek commented on BEAM-5246:


I was looking around a bit more ... Does the attached PR make sense, to forward 
metrics from accumulator to metricGroup only for actual stepName?

> Beam metrics exported as flink metrics are not correct
> --
>
> Key: BEAM-5246
> URL: https://issues.apache.org/jira/browse/BEAM-5246
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In Flink UI and fink native MetricReported, I am seeing too many instances of 
> my Beam metric counter. It looks like the counter is materialised for every 
> operator running within the task, although is is emitter from only one beam 
> step (which should map to one operator?). This produces double counting.
> A bit debugging I noticed this is happening for stream jobs. In batch I was 
> not able to reproduce it. Problem might be in FlinkMetricContainer.
> [https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java#L86]
> The update seems to be called from operators after finishing the bundle. Data 
> from accumulator are flushed to `runtimeContext.getMetricGroup()`. The scope 
> of accumulator seems to be different than metricGroup as in there with 
> different call the scope components change, especially for operatorID. It 
> seems like during the run, `metricResult.getStep()` does not match 
> operatorName of metricGroup where metric is being pushed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5265) Can not test Timer with processing time domain

2018-08-30 Thread Jozef Vilcek (JIRA)
Jozef Vilcek created BEAM-5265:
--

 Summary: Can not test Timer with processing time domain
 Key: BEAM-5265
 URL: https://issues.apache.org/jira/browse/BEAM-5265
 Project: Beam
  Issue Type: Bug
  Components: runner-core, runner-direct
Reporter: Jozef Vilcek
Assignee: Kenneth Knowles


I have a stateful DoFn which has a timer on PROCESSING_TIME domain. While 
writing tests, I noticed that it does not react to `advanceProcessingTime()` on 
tests stream. Problem seems to be here:

[https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L260]

I can only tell that patching this place works for direct runner tests. Not 
sure about broader impact on other runners since it is in `runner-core`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()

2018-08-30 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597108#comment-16597108
 ] 

Jozef Vilcek edited comment on BEAM-5036 at 8/30/18 6:49 AM:
-

[~timrobertson100] I guess if rename() API is kept defensive (throw exception 
when target exists) that component invoking it must decide it is OK to 
overwrite target or not. In case of `WriteOperation.moveToOutput()`, it clearly 
is instructed to reconstruct the target from source data and I believe it 
should obey. Job should be failed soon, during the launch if this is not 
desired (e.g. target dir must be empty).

Re-run batch job is not the only case. In streaming, in case of failure ( or 
upgrade ), job can be started from checkpoint. Depending on checkpoint time and 
runner guarantees, some operations can be re-run and output files recreated 
again. Operator must either delete target before `rename()` or use 
`rename(overwrite = true)` if such choice would exists in API (delete/overwrite 
off course in case source is present an there are actually data to be moved)


was (Author: jozovilcek):
[~timrobertson100] I guess if rename() API is kept defensive (throw exception 
when target exists) that component invoking it must decide it is OK to 
overwrite target or not. In case of `WriteOperation.moveToOutput()`, it clearly 
is instructed to reconstruct the target from source data and I believe it 
should obey. Job should be failed soon, during the launch if this is not 
desired (e.g. target dir must be empty).

Re-run batch job is not the only case. In streaming, in case of failure ( or 
upgrade ), job can be started from checkpoint. Depending on checkpoint time and 
runner guarantees, some operations can be re-run and output files recreated 
again. Operator must either delete target before `rename()` or use 
`rename(overwrite = true)` if such choice would exists in API.

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> --
>
> Key: BEAM-5036
> URL: https://issues.apache.org/jira/browse/BEAM-5036
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Affects Versions: 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Tim Robertson
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()

2018-08-30 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597108#comment-16597108
 ] 

Jozef Vilcek edited comment on BEAM-5036 at 8/30/18 6:43 AM:
-

[~timrobertson100] I guess if rename() API is kept defensive (throw exception 
when target exists) that component invoking it must decide it is OK to 
overwrite target or not. In case of `WriteOperation.moveToOutput()`, it clearly 
is instructed to reconstruct the target from source data and I believe it 
should obey. Job should be failed soon, during the launch if this is not 
desired (e.g. target dir must be empty).

Re-run batch job is not the only case. In streaming, in case of failure ( or 
upgrade ), job can be started from checkpoint. Depending on checkpoint time and 
runner guarantees, some operations can be re-run and output files recreated 
again. Operator must either delete target before `rename()` or use 
`rename(overwrite = true)` if such choice would exists in API.


was (Author: jozovilcek):
[~timrobertson100] I guess if rename() API is kept defensive (throw exception 
when target exists) that component invoking it must decide it is OK to 
overwrite target or not. In case of `WriteOperation.moveToOutput()`, it clearly 
is instructed to reconstruct the target from source data and I believe it 
should obey. Job should be failed soon, during the launch if this is not 
desired (e.g. target dir must be empty).

Re-run batch job is not the only case. In streaming, in case of failure job can 
be start from checkpoint. Depending on checkpoint time and runner guarantees, 
some operations can be re-run and output files recreated again. Operator must 
either delete target before `rename()` or use `rename(overwrite = true)` if 
such choice would exists in API.

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> --
>
> Key: BEAM-5036
> URL: https://issues.apache.org/jira/browse/BEAM-5036
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Affects Versions: 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Tim Robertson
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()

2018-08-30 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597108#comment-16597108
 ] 

Jozef Vilcek commented on BEAM-5036:


[~timrobertson100] I guess if rename() API is kept defensive (throw exception 
when target exists) that component invoking it must decide it is OK to 
overwrite target or not. In case of `WriteOperation.moveToOutput()`, it clearly 
is instructed to reconstruct the target from source data and I believe it 
should obey. Job should be failed soon, during the launch if this is not 
desired (e.g. target dir must be empty).

Re-run batch job is not the only case. In streaming, in case of failure job can 
be start from checkpoint. Depending on checkpoint time and runner guarantees, 
some operations can be re-run and output files recreated again. Operator must 
either delete target before `rename()` or use `rename(overwrite = true)` if 
such choice would exists in API.

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> --
>
> Key: BEAM-5036
> URL: https://issues.apache.org/jira/browse/BEAM-5036
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Affects Versions: 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Tim Robertson
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()

2018-08-29 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596542#comment-16596542
 ] 

Jozef Vilcek commented on BEAM-5036:


Aha, I see. I did not know it might use to work differently before. Maybe the 
"write to temp file -> promote to final destination" is new with WriteFiles?

Sure, every unnecessary IO is suboptimal. Agreed.

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> --
>
> Key: BEAM-5036
> URL: https://issues.apache.org/jira/browse/BEAM-5036
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Affects Versions: 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Tim Robertson
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()

2018-08-29 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596491#comment-16596491
 ] 

Jozef Vilcek commented on BEAM-5036:


As far as I understand this, there can be only gain, no performance 
degradation. Gain is for FS with effective rename like HDFS. As for GFS, things 
should be the same. `moveToOutput()` now do a `copy` of data + `delete` source. 
New way it would be `rename` and GFS implements this (as [~timrobertson100] 
states above) as `copy` + `delete` too.

So jobs should not be affected? Am I missing something?

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> --
>
> Key: BEAM-5036
> URL: https://issues.apache.org/jira/browse/BEAM-5036
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Affects Versions: 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Tim Robertson
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4861) Hadoop Filesystem silently fails

2018-08-28 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594868#comment-16594868
 ] 

Jozef Vilcek commented on BEAM-4861:


Yes, make sense to me

> Hadoop Filesystem silently fails
> 
>
> Key: BEAM-4861
> URL: https://issues.apache.org/jira/browse/BEAM-4861
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-hadoop
>Reporter: Jozef Vilcek
>Assignee: Chamikara Jayalath
>Priority: Major
>
> Hi,
> beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
> native filesystem operations are not and returns void. Current implementation 
> in Beam ignores the result and pass as long as exception is not thrown.
> I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
> target directory does not exists, operations returns false and do not touch 
> the file.
> [https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L148]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5247) Remove slf4j-simple binding from dependencies

2018-08-28 Thread Jozef Vilcek (JIRA)
Jozef Vilcek created BEAM-5247:
--

 Summary: Remove slf4j-simple binding from dependencies
 Key: BEAM-5247
 URL: https://issues.apache.org/jira/browse/BEAM-5247
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Jozef Vilcek
Assignee: Aljoscha Krettek


Flink runner declares a slf4j-simple binding in dependencies. This can break 
logging of application if they have their own binding and does not exclude this 
one from beam.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5246) Beam metrics exported as flink metrics are not correct

2018-08-28 Thread Jozef Vilcek (JIRA)
Jozef Vilcek created BEAM-5246:
--

 Summary: Beam metrics exported as flink metrics are not correct
 Key: BEAM-5246
 URL: https://issues.apache.org/jira/browse/BEAM-5246
 Project: Beam
  Issue Type: Bug
  Components: runner-flink
Affects Versions: 2.6.0
Reporter: Jozef Vilcek
Assignee: Aljoscha Krettek


In Flink UI and fink native MetricReported, I am seeing too many instances of 
my Beam metric counter. It looks like the counter is materialised for every 
operator running within the task, although is is emitter from only one beam 
step (which should map to one operator?). This produces double counting.

A bit debugging I noticed this is happening for stream jobs. In batch I was not 
able to reproduce it. Problem might be in FlinkMetricContainer.

[https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java#L86]

The update seems to be called from operators after finishing the bundle. Data 
from accumulator are flushed to `runtimeContext.getMetricGroup()`. The scope of 
accumulator seems to be different than metricGroup as in there with different 
call the scope components change, especially for operatorID. It seems like 
during the run, `metricResult.getStep()` does not match operatorName of 
metricGroup where metric is being pushed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5239) Allow configure latencyTrackingInterval

2018-08-27 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593612#comment-16593612
 ] 

Jozef Vilcek commented on BEAM-5239:


Going to create PR

> Allow configure latencyTrackingInterval
> ---
>
> Key: BEAM-5239
> URL: https://issues.apache.org/jira/browse/BEAM-5239
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Aljoscha Krettek
>Priority: Major
>
> Because of FLINK-10226, we need to be able to set 
> latencyTrackingConfiguration for flink via FlinkPipelineOptions



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5239) Allow configure latencyTrackingInterval

2018-08-27 Thread Jozef Vilcek (JIRA)
Jozef Vilcek created BEAM-5239:
--

 Summary: Allow configure latencyTrackingInterval
 Key: BEAM-5239
 URL: https://issues.apache.org/jira/browse/BEAM-5239
 Project: Beam
  Issue Type: New Feature
  Components: runner-flink
Affects Versions: 2.6.0
Reporter: Jozef Vilcek
Assignee: Aljoscha Krettek


Because of FLINK-10226, we need to be able to set latencyTrackingConfiguration 
for flink via FlinkPipelineOptions



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4861) Hadoop Filesystem silently fails

2018-08-27 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593447#comment-16593447
 ] 

Jozef Vilcek commented on BEAM-4861:


For unsuccessful operation, I would throw exception as well. In practice, this 
is what is mostly done around the native HDFS boolean methods by helpers. Fail 
and investigate later what was wrong.

For rename, create directories where necessary sounds good. Plus with allowing 
overwrites, behaviour would be consistent with what I observe on "normal file 
create" operations. Allow overwrite is maybe allowed for cases or restarting 
jobs form snapshots which can lead to reprocessing and recreating same outputs 
again? Not sure.

> Hadoop Filesystem silently fails
> 
>
> Key: BEAM-4861
> URL: https://issues.apache.org/jira/browse/BEAM-4861
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-hadoop
>Reporter: Jozef Vilcek
>Assignee: Chamikara Jayalath
>Priority: Major
>
> Hi,
> beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
> native filesystem operations are not and returns void. Current implementation 
> in Beam ignores the result and pass as long as exception is not thrown.
> I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
> target directory does not exists, operations returns false and do not touch 
> the file.
> [https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L148]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5180) Broken FileResultCoder via parseSchema change

2018-08-26 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592931#comment-16592931
 ] 

Jozef Vilcek commented on BEAM-5180:


of put a strong requirement that every path declares a `scheme`. That way a 
default_scheme could be dropped and never guessed
[https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L443]

windows paths will needs to be `file:///c:/something/in/here`

But my sight is limited so this might not feasible for some reasons I do not 
foresee. Just an idea

> Broken FileResultCoder via parseSchema change
> -
>
> Key: BEAM-5180
> URL: https://issues.apache.org/jira/browse/BEAM-5180
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Ankur Goenka
>Priority: Blocker
> Fix For: 2.7.0
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Recently this commit
> [https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384]
> introduced more strict schema parsing which is breaking the contract between 
> _FileResultCoder_ and _FileSystems.matchNewResource()_.
> Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
> relies on filesystem being able to parse it back again. Having strict 
> _scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
> _ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`
> I guess the _ResourceIdCoder_ is suffering the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-5180) Broken FileResultCoder via parseSchema change

2018-08-25 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592486#comment-16592486
 ] 

Jozef Vilcek edited comment on BEAM-5180 at 8/25/18 6:11 AM:
-

Sorry, I am unable to see how // is enforced. I see 
{noformat}
[scheme:][//authority][path][?query][#fragment]{noformat}
which I interpret as if authority is not present, `//` does not have to be 
there either.

In reality, this is the behaviour:
{code:java}
System.out.println(new java.net.URI("hdfs:/path/to/file"));
System.out.println(new java.net.URI("hdfs:///path/to/file"));
System.out.println(new 
java.net.URI("hdfs:///path/to/file").resolve("something-else"));
{code}
prints
{noformat}
hdfs:/path/to/file
hdfs:///path/to/file
hdfs:/path/to/something-else
{noformat}
So even if user specifies hdfs://  paths as entry, it will be due to operations 
within the beam translated to hdfs:/


was (Author: jozovilcek):
Sorry, I am unable to see how // is enforced. I see 
{noformat}
[scheme:][//authority][path][?query][#fragment]{noformat}
which I interpret as if authority is not present, `//` does not have to be 
there either.

In reality, this is the behaviour:
{code:java}
System.out.println(new java.net.URI("hdfs:/path/to/file"));
System.out.println(new java.net.URI("hdfs:///path/to/file"));
System.out.println(new 
java.net.URI("hdfs:///path/to/file").resolve("something-else"));
{code}
prints
{code:java}
hdfs:/path/to/file
hdfs:///path/to/file
hdfs:/path/to/something-else
{code}
So even if user specifies hdfs://  paths as entry, it will be due to operations 
within the beam translated to hdfs:/

> Broken FileResultCoder via parseSchema change
> -
>
> Key: BEAM-5180
> URL: https://issues.apache.org/jira/browse/BEAM-5180
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Ankur Goenka
>Priority: Blocker
> Fix For: 2.7.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Recently this commit
> [https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384]
> introduced more strict schema parsing which is breaking the contract between 
> _FileResultCoder_ and _FileSystems.matchNewResource()_.
> Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
> relies on filesystem being able to parse it back again. Having strict 
> _scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
> _ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`
> I guess the _ResourceIdCoder_ is suffering the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5180) Broken FileResultCoder via parseSchema change

2018-08-25 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592486#comment-16592486
 ] 

Jozef Vilcek commented on BEAM-5180:


Sorry, I am unable to see how // is enforced. I see 
{noformat}
[scheme:][//authority][path][?query][#fragment]{noformat}
which I interpret as if authority is not present, `//` does not have to be 
there either.

In reality, this is the behaviour:
{code:java}
System.out.println(new java.net.URI("hdfs:/path/to/file"));
System.out.println(new java.net.URI("hdfs:///path/to/file"));
System.out.println(new 
java.net.URI("hdfs:///path/to/file").resolve("something-else"));
{code}
prints
{code:java}
hdfs:/path/to/file
hdfs:///path/to/file
hdfs:/path/to/something-else
{code}
So even if user specifies hdfs://  paths as entry, it will be due to operations 
within the beam translated to hdfs:/

> Broken FileResultCoder via parseSchema change
> -
>
> Key: BEAM-5180
> URL: https://issues.apache.org/jira/browse/BEAM-5180
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Ankur Goenka
>Priority: Blocker
> Fix For: 2.7.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Recently this commit
> [https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384]
> introduced more strict schema parsing which is breaking the contract between 
> _FileResultCoder_ and _FileSystems.matchNewResource()_.
> Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
> relies on filesystem being able to parse it back again. Having strict 
> _scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
> _ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`
> I guess the _ResourceIdCoder_ is suffering the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5180) Broken FileResultCoder via parseSchema change

2018-08-22 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588727#comment-16588727
 ] 

Jozef Vilcek commented on BEAM-5180:


Any though on this? Is this a non valid URI or ResourceId {{hdfs:/ }}? Given 
that authority component is optional, extra {{//}} can be dropped. 
{{java.net.URI}} parse that just fine.

> Broken FileResultCoder via parseSchema change
> -
>
> Key: BEAM-5180
> URL: https://issues.apache.org/jira/browse/BEAM-5180
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Blocker
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Recently this commit
> [https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384]
> introduced more strict schema parsing which is breaking the contract between 
> _FileResultCoder_ and _FileSystems.matchNewResource()_.
> Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
> relies on filesystem being able to parse it back again. Having strict 
> _scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
> _ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`
> I guess the _ResourceIdCoder_ is suffering the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2277) IllegalArgumentException when using Hadoop file system for WordCount example.

2018-08-22 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588725#comment-16588725
 ] 

Jozef Vilcek commented on BEAM-2277:


[~timrobertson100] I believe root cause and solution to this is BEAM-5180 and 
PR linked there.

> IllegalArgumentException when using Hadoop file system for WordCount example.
> -
>
> Key: BEAM-2277
> URL: https://issues.apache.org/jira/browse/BEAM-2277
> Project: Beam
>  Issue Type: Bug
>  Components: z-do-not-use-sdk-java-extensions
>Affects Versions: 2.6.0
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>Priority: Blocker
> Fix For: 2.0.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> IllegalArgumentException when using Hadoop file system for WordCount example.
> Occurred when running WordCount example using Spark runner on a YARN cluster.
> Command-line arguments:
> {code:none}
> --runner=SparkRunner --inputFile=hdfs:///user/myuser/kinglear.txt 
> --output=hdfs:///user/myuser/wc/wc
> {code}
> Stack trace:
> {code:none}
> java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds 
> have the same scheme, but received file, hdfs.
>   at 
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
>   at 
> org.apache.beam.sdk.io.FileSystems.validateSrcDestLists(FileSystems.java:394)
>   at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:236)
>   at 
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.copyToOutputFiles(FileBasedSink.java:626)
>   at 
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.finalize(FileBasedSink.java:516)
>   at 
> org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:592)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (BEAM-5180) Broken FileResultCoder via parseSchema change

2018-08-21 Thread Jozef Vilcek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated BEAM-5180:
---
Comment: was deleted

(was: Submitted a pull request for BEAM-2277 which should fix the root cause 
and keep the scheme parsing more strict as it is.)

> Broken FileResultCoder via parseSchema change
> -
>
> Key: BEAM-5180
> URL: https://issues.apache.org/jira/browse/BEAM-5180
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Blocker
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Recently this commit
> [https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384]
> introduced more strict schema parsing which is breaking the contract between 
> _FileResultCoder_ and _FileSystems.matchNewResource()_.
> Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
> relies on filesystem being able to parse it back again. Having strict 
> _scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
> _ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`
> I guess the _ResourceIdCoder_ is suffering the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-2277) IllegalArgumentException when using Hadoop file system for WordCount example.

2018-08-21 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587423#comment-16587423
 ] 

Jozef Vilcek edited comment on BEAM-2277 at 8/21/18 1:31 PM:
-

So after more investigation, URI, which is backing the HadoopResourceId does 
also drop '//' on empty authority when calling HadoopResourceIr.resolve() and 
produce string versions of resource in "hdfs:/path" form. This is not accepted 
back as hdfs scheme by FileSystems.matchNewResource(). The question is, what 
would be an elegant fix to that.


was (Author: jozovilcek):
So after ore investigation, URI, which is backing the HadoopResourceId does 
also drope empty authority hen calling HadoopResourceIr.resolve() and produce 
string versions of resource in "hdfs:/path" form. This is not accepted back as 
hdfs scheme by FileSystems.matchNewResource(). The question is, what would be 
an elegant fix to that.

> IllegalArgumentException when using Hadoop file system for WordCount example.
> -
>
> Key: BEAM-2277
> URL: https://issues.apache.org/jira/browse/BEAM-2277
> Project: Beam
>  Issue Type: Bug
>  Components: z-do-not-use-sdk-java-extensions
>Affects Versions: 2.6.0
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>Priority: Blocker
> Fix For: 2.0.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> IllegalArgumentException when using Hadoop file system for WordCount example.
> Occurred when running WordCount example using Spark runner on a YARN cluster.
> Command-line arguments:
> {code:none}
> --runner=SparkRunner --inputFile=hdfs:///user/myuser/kinglear.txt 
> --output=hdfs:///user/myuser/wc/wc
> {code}
> Stack trace:
> {code:none}
> java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds 
> have the same scheme, but received file, hdfs.
>   at 
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
>   at 
> org.apache.beam.sdk.io.FileSystems.validateSrcDestLists(FileSystems.java:394)
>   at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:236)
>   at 
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.copyToOutputFiles(FileBasedSink.java:626)
>   at 
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.finalize(FileBasedSink.java:516)
>   at 
> org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:592)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2277) IllegalArgumentException when using Hadoop file system for WordCount example.

2018-08-21 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587423#comment-16587423
 ] 

Jozef Vilcek commented on BEAM-2277:


So after ore investigation, URI, which is backing the HadoopResourceId does 
also drope empty authority hen calling HadoopResourceIr.resolve() and produce 
string versions of resource in "hdfs:/path" form. This is not accepted back as 
hdfs scheme by FileSystems.matchNewResource(). The question is, what would be 
an elegant fix to that.

> IllegalArgumentException when using Hadoop file system for WordCount example.
> -
>
> Key: BEAM-2277
> URL: https://issues.apache.org/jira/browse/BEAM-2277
> Project: Beam
>  Issue Type: Bug
>  Components: z-do-not-use-sdk-java-extensions
>Affects Versions: 2.6.0
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>Priority: Blocker
> Fix For: 2.0.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> IllegalArgumentException when using Hadoop file system for WordCount example.
> Occurred when running WordCount example using Spark runner on a YARN cluster.
> Command-line arguments:
> {code:none}
> --runner=SparkRunner --inputFile=hdfs:///user/myuser/kinglear.txt 
> --output=hdfs:///user/myuser/wc/wc
> {code}
> Stack trace:
> {code:none}
> java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds 
> have the same scheme, but received file, hdfs.
>   at 
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
>   at 
> org.apache.beam.sdk.io.FileSystems.validateSrcDestLists(FileSystems.java:394)
>   at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:236)
>   at 
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.copyToOutputFiles(FileBasedSink.java:626)
>   at 
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.finalize(FileBasedSink.java:516)
>   at 
> org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:592)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5180) Broken FileResultCoder via parseSchema change

2018-08-21 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587298#comment-16587298
 ] 

Jozef Vilcek commented on BEAM-5180:


Submitted a pull request for BEAM-2277 which should fix the root cause and keep 
the scheme parsing more strict as it is.

> Broken FileResultCoder via parseSchema change
> -
>
> Key: BEAM-5180
> URL: https://issues.apache.org/jira/browse/BEAM-5180
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Blocker
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Recently this commit
> [https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384]
> introduced more strict schema parsing which is breaking the contract between 
> _FileResultCoder_ and _FileSystems.matchNewResource()_.
> Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
> relies on filesystem being able to parse it back again. Having strict 
> _scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
> _ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`
> I guess the _ResourceIdCoder_ is suffering the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5180) Broken FileResultCoder via parseSchema change

2018-08-21 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587285#comment-16587285
 ] 

Jozef Vilcek commented on BEAM-5180:


For reference, this is maybe a root cause which this change in scheme parsing 
triggered
https://issues.apache.org/jira/browse/BEAM-2277?focusedCommentId=16587202=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16587202

> Broken FileResultCoder via parseSchema change
> -
>
> Key: BEAM-5180
> URL: https://issues.apache.org/jira/browse/BEAM-5180
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Blocker
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Recently this commit
> [https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384]
> introduced more strict schema parsing which is breaking the contract between 
> _FileResultCoder_ and _FileSystems.matchNewResource()_.
> Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
> relies on filesystem being able to parse it back again. Having strict 
> _scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
> _ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`
> I guess the _ResourceIdCoder_ is suffering the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-2277) IllegalArgumentException when using Hadoop file system for WordCount example.

2018-08-21 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587202#comment-16587202
 ] 

Jozef Vilcek edited comment on BEAM-2277 at 8/21/18 10:54 AM:
--

Is anyone have this problem if it is using HDFS paths in form with vs without 
authority? *hdfs:///path/to/dir*    vs   *hdfs://ha-nn/path/to/dir* ?

I suspect the problem could be drop empty authority here:

[https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L260]

Which does for
{code:java}
FileSystems.matchNewResource("hdfs:///path/to/dir", false)
{code}
produce
{code:java}
hdfs:/path/to/dir
{code}
Reading back such path via  FileSystems.matchNewResource()  produce a 
ResourceId with "file" schema because of the chance in BEAM-5180

 


was (Author: jozovilcek):
Is anyone have this problem if it is using HDFS paths in form with vs without 
authority? *hdfs:///path/to/dir*    vs   *hdfs://ha-nn/path/to/dir* ?


I suspect the problem could be drop empty authority here:

[https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L260]

Which does for
{code:java}
FileSystems.matchNewResource("hdfs:///path/to/dir", false)
{code}
produce

 

 
{code:java}
hdfs:/path/to/dir
{code}
Reading back such path via  FileSystems.matchNewResource()  produce a 
ResourceId with "file" schema because of the chance in BEAM-5180

 

> IllegalArgumentException when using Hadoop file system for WordCount example.
> -
>
> Key: BEAM-2277
> URL: https://issues.apache.org/jira/browse/BEAM-2277
> Project: Beam
>  Issue Type: Bug
>  Components: z-do-not-use-sdk-java-extensions
>Affects Versions: 2.6.0
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>Priority: Blocker
> Fix For: 2.0.0
>
>
> IllegalArgumentException when using Hadoop file system for WordCount example.
> Occurred when running WordCount example using Spark runner on a YARN cluster.
> Command-line arguments:
> {code:none}
> --runner=SparkRunner --inputFile=hdfs:///user/myuser/kinglear.txt 
> --output=hdfs:///user/myuser/wc/wc
> {code}
> Stack trace:
> {code:none}
> java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds 
> have the same scheme, but received file, hdfs.
>   at 
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
>   at 
> org.apache.beam.sdk.io.FileSystems.validateSrcDestLists(FileSystems.java:394)
>   at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:236)
>   at 
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.copyToOutputFiles(FileBasedSink.java:626)
>   at 
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.finalize(FileBasedSink.java:516)
>   at 
> org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:592)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2277) IllegalArgumentException when using Hadoop file system for WordCount example.

2018-08-21 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587202#comment-16587202
 ] 

Jozef Vilcek commented on BEAM-2277:


Is anyone have this problem if it is using HDFS paths in form with vs without 
authority? *hdfs:///path/to/dir*    vs   *hdfs://ha-nn/path/to/dir* ?


I suspect the problem could be drop empty authority here:

[https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L260]

Which does for
{code:java}
FileSystems.matchNewResource("hdfs:///path/to/dir", false)
{code}
produce

 

 
{code:java}
hdfs:/path/to/dir
{code}
Reading back such path via  FileSystems.matchNewResource()  produce a 
ResourceId with "file" schema because of the chance in BEAM-5180

 

> IllegalArgumentException when using Hadoop file system for WordCount example.
> -
>
> Key: BEAM-2277
> URL: https://issues.apache.org/jira/browse/BEAM-2277
> Project: Beam
>  Issue Type: Bug
>  Components: z-do-not-use-sdk-java-extensions
>Affects Versions: 2.6.0
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>Priority: Blocker
> Fix For: 2.0.0
>
>
> IllegalArgumentException when using Hadoop file system for WordCount example.
> Occurred when running WordCount example using Spark runner on a YARN cluster.
> Command-line arguments:
> {code:none}
> --runner=SparkRunner --inputFile=hdfs:///user/myuser/kinglear.txt 
> --output=hdfs:///user/myuser/wc/wc
> {code}
> Stack trace:
> {code:none}
> java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds 
> have the same scheme, but received file, hdfs.
>   at 
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
>   at 
> org.apache.beam.sdk.io.FileSystems.validateSrcDestLists(FileSystems.java:394)
>   at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:236)
>   at 
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.copyToOutputFiles(FileBasedSink.java:626)
>   at 
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.finalize(FileBasedSink.java:516)
>   at 
> org.apache.beam.sdk.io.WriteFiles$2.processElement(WriteFiles.java:592)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4142) HadoopResourceIdTest has had a masked failure

2018-08-21 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587139#comment-16587139
 ] 

Jozef Vilcek commented on BEAM-4142:


What is a problem here? Having no tests for HadoopResourceId is not healthy. 
Maybe it could of prevented BEAM-5180 and BEAM-2277, can tell for sure

> HadoopResourceIdTest has had a masked failure
> -
>
> Key: BEAM-4142
> URL: https://issues.apache.org/jira/browse/BEAM-4142
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-hadoop
>Reporter: Kenneth Knowles
>Priority: Major
>  Labels: sickbay
>
> Sickbayed in https://github.com/apache/beam/pull/5161, the test should be 
> fixed and no longer ignored.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5180) Broken FileResultCoder via parseSchema change

2018-08-20 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585871#comment-16585871
 ] 

Jozef Vilcek commented on BEAM-5180:


I have added a simple roll-back PR so I can use this in my patched version and 
track the bug until best solution is decided

> Broken FileResultCoder via parseSchema change
> -
>
> Key: BEAM-5180
> URL: https://issues.apache.org/jira/browse/BEAM-5180
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Blocker
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Recently this commit
> [https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384]
> introduced more strict schema parsing which is breaking the contract between 
> _FileResultCoder_ and _FileSystems.matchNewResource()_.
> Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
> relies on filesystem being able to parse it back again. Having strict 
> _scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
> _ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`
> I guess the _ResourceIdCoder_ is suffering the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-5180) Broken FileResultCoder via parseSchema change

2018-08-20 Thread Jozef Vilcek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated BEAM-5180:
---
Description: 
Recently this commit

[https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384]

introduced more strict schema parsing which is breaking the contract between 
_FileResultCoder_ and _FileSystems.matchNewResource()_.

Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
relies on filesystem being able to parse it back again. Having strict 
_scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
_ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`

I guess the _ResourceIdCoder_ is suffering the same problem.

  was:
Recently this commit

https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384

introduced more strict schema parsing which is breaking the contract between 
_FileResultCoder_ and _FileSystems.matchNewResource()_.

Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
relies on filesystem being able to parse it back again. Having strict 
_scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
_ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`

I guess the _ResourceIdCoder_ is suffering the same problem.

Either scheme parsing should be less strict or ResourceId.toString() for hadoop 
fixed


> Broken FileResultCoder via parseSchema change
> -
>
> Key: BEAM-5180
> URL: https://issues.apache.org/jira/browse/BEAM-5180
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Blocker
>
> Recently this commit
> [https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384]
> introduced more strict schema parsing which is breaking the contract between 
> _FileResultCoder_ and _FileSystems.matchNewResource()_.
> Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
> relies on filesystem being able to parse it back again. Having strict 
> _scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
> _ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`
> I guess the _ResourceIdCoder_ is suffering the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-5180) Broken FileResultCoder via parseSchema change

2018-08-20 Thread Jozef Vilcek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated BEAM-5180:
---
Description: 
Recently this commit

https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384

introduced more strict schema parsing which is breaking the contract between 
_FileResultCoder_ and _FileSystems.matchNewResource()_.

Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
relies on filesystem being able to parse it back again. Having strict 
_scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
_ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`

I guess the _ResourceIdCoder_ is suffering the same problem.

Either scheme parsing should be less strict or ResourceId.toString() for hadoop 
fixed

  was:
Recently this commit introduced more strict schema parsing which is breaking 
the contract between `FileResultCoder` and `FileSystems.matchNewResource()`.

Coder takes `ResourceId` and serialize it via `toString` methods and then 
relies on filesystem being able to parse it back again. Having strict 
`scheme://` breaks this at least for `Hadoop` filesystem which use `URI for 
`ResourceId` and produce `toString()` in form of `hdfs:/some/path`

I guess the `ResourceIdCoder` is suffering the same problem.

Either scheme parsing should be less strict or `ResourceId.toString()` for 
`hadoop` fixed


> Broken FileResultCoder via parseSchema change
> -
>
> Key: BEAM-5180
> URL: https://issues.apache.org/jira/browse/BEAM-5180
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.6.0
>Reporter: Jozef Vilcek
>Assignee: Kenneth Knowles
>Priority: Blocker
>
> Recently this commit
> https://github.com/apache/beam/commit/3fff58c21f94415f3397e185377e36d3df662384
> introduced more strict schema parsing which is breaking the contract between 
> _FileResultCoder_ and _FileSystems.matchNewResource()_.
> Coder takes _ResourceId_ and serialize it via `_toString_` methods and then 
> relies on filesystem being able to parse it back again. Having strict 
> _scheme://_ breaks this at least for Hadoop filesystem which use _URI_ for 
> _ResourceId_ and produce _toString()_ in form of `_hdfs:/some/path_`
> I guess the _ResourceIdCoder_ is suffering the same problem.
> Either scheme parsing should be less strict or ResourceId.toString() for 
> hadoop fixed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5180) Broken FileResultCoder via parseSchema change

2018-08-20 Thread Jozef Vilcek (JIRA)
Jozef Vilcek created BEAM-5180:
--

 Summary: Broken FileResultCoder via parseSchema change
 Key: BEAM-5180
 URL: https://issues.apache.org/jira/browse/BEAM-5180
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Affects Versions: 2.6.0
Reporter: Jozef Vilcek
Assignee: Kenneth Knowles


Recently this commit introduced more strict schema parsing which is breaking 
the contract between `FileResultCoder` and `FileSystems.matchNewResource()`.

Coder takes `ResourceId` and serialize it via `toString` methods and then 
relies on filesystem being able to parse it back again. Having strict 
`scheme://` breaks this at least for `Hadoop` filesystem which use `URI for 
`ResourceId` and produce `toString()` in form of `hdfs:/some/path`

I guess the `ResourceIdCoder` is suffering the same problem.

Either scheme parsing should be less strict or `ResourceId.toString()` for 
`hadoop` fixed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4521) Backlog metrics not showing up

2018-08-02 Thread Jozef Vilcek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated BEAM-4521:
---
Affects Version/s: 2.5.0

> Backlog metrics not showing up
> --
>
> Key: BEAM-4521
> URL: https://issues.apache.org/jira/browse/BEAM-4521
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.4.0, 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Aljoscha Krettek
>Priority: Minor
>
> Hello,
> I wanted to track _backlog_bytes_ and _backlog_elements_ metrics from 
> SinkMetrics for kafka. I see in the code that KafkaUnboundedReader is 
> reporting them but was not able to get them visible (running on Flink).
> Metrics are reported here:
> [https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L668]
> In Flink runtime I noticed this message:
> {code:java}
> 2018-06-07 08:53:53,216 ERROR org.apache.beam.sdk.metrics.MetricsEnvironment  
>               - Unable to update metrics on the current thread. Most likely 
> caused by using metrics outside the managed work-execution thread.
> {code}
> I see that backlog is reported from getCheckpointMark(), which is done by 
> some other thread. Not sure why it is done there. 
> I tested locally to move it to the advance() method, where bytes_read is 
> reported and it worked. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5028) getPerDestinationOutputFilenames() is getting processed before write is finished

2018-08-01 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564812#comment-16564812
 ] 

Jozef Vilcek commented on BEAM-5028:


This one is already merged to master a while back but did not make it on to 
2.6.0 release candidate. Any change it can be picked up or is it too late?

> getPerDestinationOutputFilenames() is getting processed before write is 
> finished
> 
>
> Key: BEAM-5028
> URL: https://issues.apache.org/jira/browse/BEAM-5028
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-files
>Affects Versions: 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> I believe I am hitting a race condition on finalising writes vs notification 
> events about frites being done.
> It looks to me that WriteFilesResult.getPerDestinationOutputFilenames() can 
> announce files before they are finished:
> [https://github.com/apache/beam/blob/release-2.5.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L799]
> This is related to BEAM-3268 where it appears to not be fully fixed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5028) getPerDestinationOutputFilenames() is getting processed before write is finished

2018-07-27 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559408#comment-16559408
 ] 

Jozef Vilcek commented on BEAM-5028:


This simple pull request seems to do the trick for my case

> getPerDestinationOutputFilenames() is getting processed before write is 
> finished
> 
>
> Key: BEAM-5028
> URL: https://issues.apache.org/jira/browse/BEAM-5028
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-files
>Affects Versions: 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Eugene Kirpichov
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> I believe I am hitting a race condition on finalising writes vs notification 
> events about frites being done.
> It looks to me that WriteFilesResult.getPerDestinationOutputFilenames() can 
> announce files before they are finished:
> [https://github.com/apache/beam/blob/release-2.5.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L799]
> This is related to BEAM-3268 where it appears to not be fully fixed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()

2018-07-27 Thread Jozef Vilcek (JIRA)
Jozef Vilcek created BEAM-5036:
--

 Summary: Optimize FileBasedSink's WriteOperation.moveToOutput()
 Key: BEAM-5036
 URL: https://issues.apache.org/jira/browse/BEAM-5036
 Project: Beam
  Issue Type: Improvement
  Components: io-java-files
Affects Versions: 2.5.0
Reporter: Jozef Vilcek
Assignee: Eugene Kirpichov


moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
copy+delete. It would be better to use a rename() which can be much more 
effective for some filesystems.

Filesystem must support cross-directory rename. BEAM-4861 is related to this 
for the case of HDFS filesystem.

Feature was discussed here:

http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-5028) getPerDestinationOutputFilenames() is getting processed before write is finished

2018-07-26 Thread Jozef Vilcek (JIRA)
Jozef Vilcek created BEAM-5028:
--

 Summary: getPerDestinationOutputFilenames() is getting processed 
before write is finished
 Key: BEAM-5028
 URL: https://issues.apache.org/jira/browse/BEAM-5028
 Project: Beam
  Issue Type: Bug
  Components: io-java-files
Affects Versions: 2.5.0
Reporter: Jozef Vilcek
Assignee: Eugene Kirpichov


I believe I am hitting a race condition on finalising writes vs notification 
events about frites being done.

It looks to me that WriteFilesResult.getPerDestinationOutputFilenames() can 
announce files before they are finished:

[https://github.com/apache/beam/blob/release-2.5.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L799]

This is related to BEAM-3268 where it appears to not be fully fixed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4861) Hadoop Filesystem silently fails

2018-07-25 Thread Jozef Vilcek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated BEAM-4861:
---
Description: 
Hi,

beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
native filesystem operations are not and returns void. Current implementation 
in Beam ignores the result and pass as long as exception is not thrown.

My question is, if this is by design or a bug?

I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
target directory does not exists, operations returns false and do not touch the 
file.

  was:
Hi,

beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
native filesystem operations are not and returns void. Current implementation 
in Beam ignores the result and pass as long as exception is not thrown.

My question is, if this is by design or a bug.

I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
target directory does not exists, operations returns false and do not touch the 
file.


> Hadoop Filesystem silently fails
> 
>
> Key: BEAM-4861
> URL: https://issues.apache.org/jira/browse/BEAM-4861
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-hadoop
>Reporter: Jozef Vilcek
>Assignee: Chamikara Jayalath
>Priority: Major
>
> Hi,
> beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
> native filesystem operations are not and returns void. Current implementation 
> in Beam ignores the result and pass as long as exception is not thrown.
> My question is, if this is by design or a bug?
> I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
> target directory does not exists, operations returns false and do not touch 
> the file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4861) Hadoop Filesystem silently fails

2018-07-25 Thread Jozef Vilcek (JIRA)
Jozef Vilcek created BEAM-4861:
--

 Summary: Hadoop Filesystem silently fails
 Key: BEAM-4861
 URL: https://issues.apache.org/jira/browse/BEAM-4861
 Project: Beam
  Issue Type: Bug
  Components: io-java-hadoop
Reporter: Jozef Vilcek
Assignee: Chamikara Jayalath


Hi,

beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
native filesystem operations are not and returns void. Current implementation 
in Beam ignores the result and pass as long as exception is not thrown.

My question is, if this is by design or a bug.

I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
target directory does not exists, operations returns false and do not touch the 
file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4861) Hadoop Filesystem silently fails

2018-07-25 Thread Jozef Vilcek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated BEAM-4861:
---
Description: 
Hi,

beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
native filesystem operations are not and returns void. Current implementation 
in Beam ignores the result and pass as long as exception is not thrown.

I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
target directory does not exists, operations returns false and do not touch the 
file.

[https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L148]

  was:
Hi,

beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
native filesystem operations are not and returns void. Current implementation 
in Beam ignores the result and pass as long as exception is not thrown.

My question is, if this is by design or a bug?

I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
target directory does not exists, operations returns false and do not touch the 
file.

https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L148


> Hadoop Filesystem silently fails
> 
>
> Key: BEAM-4861
> URL: https://issues.apache.org/jira/browse/BEAM-4861
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-hadoop
>Reporter: Jozef Vilcek
>Assignee: Chamikara Jayalath
>Priority: Major
>
> Hi,
> beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
> native filesystem operations are not and returns void. Current implementation 
> in Beam ignores the result and pass as long as exception is not thrown.
> I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
> target directory does not exists, operations returns false and do not touch 
> the file.
> [https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L148]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4861) Hadoop Filesystem silently fails

2018-07-25 Thread Jozef Vilcek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated BEAM-4861:
---
Description: 
Hi,

beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
native filesystem operations are not and returns void. Current implementation 
in Beam ignores the result and pass as long as exception is not thrown.

My question is, if this is by design or a bug?

I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
target directory does not exists, operations returns false and do not touch the 
file.

https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L148

  was:
Hi,

beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
native filesystem operations are not and returns void. Current implementation 
in Beam ignores the result and pass as long as exception is not thrown.

My question is, if this is by design or a bug?

I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
target directory does not exists, operations returns false and do not touch the 
file.


> Hadoop Filesystem silently fails
> 
>
> Key: BEAM-4861
> URL: https://issues.apache.org/jira/browse/BEAM-4861
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-hadoop
>Reporter: Jozef Vilcek
>Assignee: Chamikara Jayalath
>Priority: Major
>
> Hi,
> beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
> native filesystem operations are not and returns void. Current implementation 
> in Beam ignores the result and pass as long as exception is not thrown.
> My question is, if this is by design or a bug?
> I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
> target directory does not exists, operations returns false and do not touch 
> the file.
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L148



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3268) getPerDestinationOutputFilenames() is getting processed before write is finished on dataflow runner

2018-07-24 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554249#comment-16554249
 ] 

Jozef Vilcek commented on BEAM-3268:


I believe I am facing this issue with windowed writes on Flink runner with Beam 
2.5.0. Accessing files announced by FileIO write results in map function fails 
from time to time and get's working by spinning some wait time.

Can it be that bug is still present for some cases?

> getPerDestinationOutputFilenames() is getting processed before write is 
> finished on dataflow runner
> ---
>
> Key: BEAM-3268
> URL: https://issues.apache.org/jira/browse/BEAM-3268
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.3.0
>Reporter: Kamil Szewczyk
>Assignee: Eugene Kirpichov
>Priority: Major
> Fix For: 2.5.0
>
> Attachments: comparison.png
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> While running filebased-io-test we found dataflow-runnner misbehaving. We run 
> tests using single pipeline and without using Reshuffling between writing and 
> reading dataflow jobs are unsuccessful because the runner tries to access the 
> files that were not created yet. 
> On the picture the difference between execution of writting is presented. On 
> the left there is working example with Reshuffling added and on the right 
> without it.
> !comparison.png|thumbnail!
> Steps to reproduce: substitute your-bucket-name wit your valid bucket.
> {code:java}
> mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests 
> -DintegrationTestPipelineOptions='["--runner=dataflow", 
> "--filenamePrefix=gs://your-bucket-name/TEXTIO_IT"]' -Pdataflow-runner
> {code}
> Then look on the cloud console and job should fail.
> Now add Reshuffling to 
> sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
>  as in the example.
> {code:java}
> .getPerDestinationOutputFilenames().apply(Values.create())
> .apply(Reshuffle.viaRandomKey());
> PCollection consolidatedHashcode = testFilenames
> {code}
> and trigger previously used maven command to see it working in the console 
> right now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3919) checkpoint can not work with flink 1.4.1,1.4.2

2018-07-21 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551561#comment-16551561
 ] 

Jozef Vilcek commented on BEAM-3919:


I see Beam's master using Flink 1.5.0 in dependencies, so Beam 2.6.0 is most 
probably what we are waiting for

> checkpoint can not work with flink 1.4.1,1.4.2
> --
>
> Key: BEAM-3919
> URL: https://issues.apache.org/jira/browse/BEAM-3919
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0, 2.4.0
>Reporter: eisig
>Assignee: Harshal Tripathi
>Priority: Critical
>
> When submmit application to flink cluster(1.4.1,1.4.2) with checkpoint 
> enabled. 
> Job fail whith exception:
> java.lang.NoSuchMethodError: 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(Lorg/apache/flink/core/memory/DataOutputViewStreamWrapper;I)V
>  
> It seems that 
> `org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup`.
>   was changed in flink1.4.1.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3919) checkpoint can not work with flink 1.4.1,1.4.2

2018-07-21 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551557#comment-16551557
 ] 

Jozef Vilcek commented on BEAM-3919:


Unfortunately Beam 2.5.0 is still targeting Flink 1.4.0. Therefore issue is 
still there. An ETA plan for rolling to Flink 1.5.0 would be much appreciated

> checkpoint can not work with flink 1.4.1,1.4.2
> --
>
> Key: BEAM-3919
> URL: https://issues.apache.org/jira/browse/BEAM-3919
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.3.0, 2.4.0
>Reporter: eisig
>Assignee: Harshal Tripathi
>Priority: Critical
>
> When submmit application to flink cluster(1.4.1,1.4.2) with checkpoint 
> enabled. 
> Job fail whith exception:
> java.lang.NoSuchMethodError: 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(Lorg/apache/flink/core/memory/DataOutputViewStreamWrapper;I)V
>  
> It seems that 
> `org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup`.
>   was changed in flink1.4.1.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4758) Avro-Protobuf support

2018-07-11 Thread Jozef Vilcek (JIRA)
Jozef Vilcek created BEAM-4758:
--

 Summary: Avro-Protobuf support
 Key: BEAM-4758
 URL: https://issues.apache.org/jira/browse/BEAM-4758
 Project: Beam
  Issue Type: Wish
  Components: io-java-avro
Reporter: Jozef Vilcek
Assignee: Eugene Kirpichov


I came a cross avro-protobuf support which allows to write Protobuf messages to 
avro file and then read them back as a) GenericRecords or b) as Protobuf typed 
message

Quite nice feature I am looking for. I do not know much about what is happening 
behind the scenes, if there are not some hidden problems and limitations e.g. 
on schema compatibility when evolving proto messages etc.

So what I want to ask is:
 # Was this feature ever considered, to enable existing AvroIO code to use also 
ProtobufDatumReader and ProtobufDatumWriter? Any wisdom what I should be aware 
of?
 # Would it make sense to add it to AvroIO ?

 

P.S.: I wanted to add it on the side for my needs and testing it but I did not 
find an easy way of doing it. Writer infrastructure is easy to extend by 
implementing custom FileIO.Sink but read side seems to be hard nut to crack 
without copy-pasting much code from AvroIO and AvroSource.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4672) Dynamic destination write to HDFS fails with IntervalWindow in shard template

2018-06-28 Thread Jozef Vilcek (JIRA)
Jozef Vilcek created BEAM-4672:
--

 Summary: Dynamic destination write to HDFS fails with 
IntervalWindow in shard template
 Key: BEAM-4672
 URL: https://issues.apache.org/jira/browse/BEAM-4672
 Project: Beam
  Issue Type: Bug
  Components: io-java-hadoop
Affects Versions: 2.4.0
Reporter: Jozef Vilcek
Assignee: Chamikara Jayalath


I am doing a windowed write with dynamic destinations and it is failing with 
error message :
{noformat}
java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds 
have the same scheme, but received hdfs, window-1970-01-01T00{noformat}
 

I am writing from IntervalWindow, for which target file name created by 
DefaultFilenamePolicy is going to look something like this:
{noformat}
window-1970-01-01T00:00:00.000Z-1970-01-01T01:00:00.000Z-pane-0-last-0-of-1.avro{noformat}
When resolving the path, HadoopResourceId is used and this one is backed by 
java URI. The filename like above is parsed incorrectly because of ':' 
character introduced by window-to-string. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-4521) Backlog metrics not showing up

2018-06-11 Thread Jozef Vilcek (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16507751#comment-16507751
 ] 

Jozef Vilcek commented on BEAM-4521:


[~rangadi], after little bit of looking around metrics environemnt, yes, it 
does seems like Runner should be able to provide context for the checkpointing 
operation. Will see what they can find.

I am not suggesting to do everything in advance(), it was merely a test for me 
to verify the problem. But it is true that half of the backlog reporting 
(backlogMessageCount) is already being materialised for TimestampPolicyContext 
every time advance() is called.

> Backlog metrics not showing up
> --
>
> Key: BEAM-4521
> URL: https://issues.apache.org/jira/browse/BEAM-4521
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.4.0
>Reporter: Jozef Vilcek
>Assignee: Aljoscha Krettek
>Priority: Minor
>
> Hello,
> I wanted to track _backlog_bytes_ and _backlog_elements_ metrics from 
> SinkMetrics for kafka. I see in the code that KafkaUnboundedReader is 
> reporting them but was not able to get them visible (running on Flink).
> Metrics are reported here:
> [https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L668]
> In Flink runtime I noticed this message:
> {code:java}
> 2018-06-07 08:53:53,216 ERROR org.apache.beam.sdk.metrics.MetricsEnvironment  
>               - Unable to update metrics on the current thread. Most likely 
> caused by using metrics outside the managed work-execution thread.
> {code}
> I see that backlog is reported from getCheckpointMark(), which is done by 
> some other thread. Not sure why it is done there. 
> I tested locally to move it to the advance() method, where bytes_read is 
> reported and it worked. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4521) Backlog metrics not showing up

2018-06-08 Thread Jozef Vilcek (JIRA)
Jozef Vilcek created BEAM-4521:
--

 Summary: Backlog metrics not showing up
 Key: BEAM-4521
 URL: https://issues.apache.org/jira/browse/BEAM-4521
 Project: Beam
  Issue Type: Bug
  Components: io-java-kafka
Affects Versions: 2.4.0
Reporter: Jozef Vilcek
Assignee: Raghu Angadi


Hello,

I wanted to track _backlog_bytes_ and _backlog_elements_ metrics from 
SinkMetrics for kafka. I see in the code that KafkaUnboundedReader is reporting 
them but was not able to get them visible (running on Flink).

Metrics are reported here:

[https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L668]

In Flink runtime I noticed this message:
{code:java}
2018-06-07 08:53:53,216 ERROR org.apache.beam.sdk.metrics.MetricsEnvironment    
            - Unable to update metrics on the current thread. Most likely 
caused by using metrics outside the managed work-execution thread.
{code}
I see that backlog is reported from getCheckpointMark(), which is done by some 
other thread. Not sure why it is done there. 

I tested locally to move it to the advance() method, where bytes_read is 
reported and it worked. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam

2018-03-30 Thread Jozef Vilcek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420307#comment-16420307
 ] 

Jozef Vilcek commented on BEAM-302:
---

Hello, I do not want to bug you, but is there an update on this JIRA? I get 
that we are all busy, therefore not looking for an exact ETA. I am looking for 
an information how likely it is that Beam will get a Scala friendly API in near 
future. [~sinisa_lyh], is integrating a Scio into Beam still the plan?

> Add Scio Scala DSL to Beam
> --
>
> Key: BEAM-302
> URL: https://issues.apache.org/jira/browse/BEAM-302
> Project: Beam
>  Issue Type: Wish
>  Components: sdk-ideas
>Reporter: Jean-Baptiste Onofré
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam

2018-02-08 Thread Jozef Vilcek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356923#comment-16356923
 ] 

Jozef Vilcek commented on BEAM-302:
---

BEAM-1920 is resolved. Looking forward to see next steps of integration Scio 
with Beam

> Add Scio Scala DSL to Beam
> --
>
> Key: BEAM-302
> URL: https://issues.apache.org/jira/browse/BEAM-302
> Project: Beam
>  Issue Type: Wish
>  Components: sdk-ideas
>Reporter: Jean-Baptiste Onofré
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)