Thanks for sharing that

Tim,
Sent from my iPhone

> On 26 Oct 2018, at 17:50, Juan Carlos Garcia <jcgarc...@gmail.com> wrote:
> 
> Just for everyone to know we figure it out, it was an environment problem.
> 
> In our case we have our cluster in a network that is not accessible directly, 
> so to deploy we need to uses Jenkins  with some slaves that have access to 
> that network.
> 
> During deployment in the main method of the class we execute 
> FileSystems.setDefaultPipelineOptions(_options); which trigger the 
> HadoopFileSystemOptionsRegistrar via the ServiceLoader mechanism and this 
> access the environment variable HADOOP_CONF_DIR in order to correctly 
> register the Filesystem.
> 
> SO, its very important that the machine you are using for deployment have 
> that Environment variable set as well (not only the worker where the pipeline 
> will run).
> 
> In our case the variable was set on the .bashrc of the user used for 
> deployment, but here is the catch.
> 
> We were using "sudo -u DEPLOYMENT_USER -s /var/lib/flink/bin/flink run -d 
> .........", but the flag -s do not execute the user .bashrc (.bash_profile), 
> hence we have failures at runtime. The fix was just replacing -s flag with -i 
> to make sure the environment variable is present when the command to run 
> works.
> 
> Thanks
> 
> 
>> On Fri, Oct 26, 2018 at 1:52 PM Juan Carlos Garcia <jcgarc...@gmail.com> 
>> wrote:
>> Hi Tim,
>> 
>> I am using FileIO directly with the AvroIO.sink(...), however having 
>> experienced BEAM-2277 with the SparkRunner few months ago, i got the feeling 
>> this is something different (maybe some dependency mismatch/missing).
>> 
>> Thanks
>> 
>>> On Fri, Oct 26, 2018 at 1:33 PM Tim Robertson <timrobertson...@gmail.com> 
>>> wrote:
>>> Hi Juan
>>> 
>>> This sounds reminiscent of https://issues.apache.org/jira/browse/BEAM-2277 
>>> which we believed fixed in 2.7.0.
>>> What IO are you using to write your files and can you paste a snippet of 
>>> your code please?
>>> 
>>> On BEAM-2277 I posted a workaround for AvroIO (it might help you find a 
>>> workaround too):
>>> 
>>>  transform.apply("Write",
>>>         AvroIO.writeGenericRecords(schema)
>>>             .to(FileSystems.matchNewResource(options.getTarget(),true))
>>>             // BEAM-2277 workaround
>>>             
>>> .withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro",
>>>  true)));
>>> 
>>> Thanks
>>> Tim
>>> 
>>> 
>>>> On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia <jcgarc...@gmail.com> 
>>>> wrote:
>>>> Hi Folks,
>>>> 
>>>> I have a strange situation while running beam 2.7.0 with the FlinkRunner, 
>>>> my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its 
>>>> checkpoint. Flink is able to correctly writes its checkpoint / savepoint 
>>>> to HDFS without any problems.
>>>> 
>>>> However, my pipeline has to write to HDFS as well, but fails with "Caused 
>>>> by: java.lang.IllegalArgumentException: No filesystem found for scheme 
>>>> hdfs"
>>>> (stacktrace at the bottom)
>>>> 
>>>> In the host where the pipeline is running:
>>>> 1. The environment variable HADOOP_CONF_DIR is set.
>>>> 2. During my pipeline construction i am explicitly calling 
>>>> FileSystems.setDefaultPipelineOptions(_options); to trigger the 
>>>> ServiceLoader to find all options registrar from the classpath
>>>> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class FileSystems 
>>>> in my main method using reflection i am able to see that at launch time it 
>>>> contains:
>>>>    {file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff, 
>>>> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
>>>> 
>>>> Any idea what i am doing wrong with the HDFS integration?
>>>>    
>>>> {snippet}
>>>>                             
>>>> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
>>>>                             Field f = 
>>>> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
>>>>                             f.setAccessible(true);
>>>>                             AtomicReference<Map<String, FileSystem>> value 
>>>> = (AtomicReference<Map<String, FileSystem>>) f.get(null);
>>>>                             
>>>> System.out.println("===========================");
>>>>                             System.out.println(value);
>>>> {snippet}
>>>> 
>>>> {stacktrace}
>>>> Caused by: java.lang.IllegalArgumentException: No filesystem found for 
>>>> scheme hdfs
>>>>         at 
>>>> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>>>>         at 
>>>> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>>>>         at 
>>>> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
>>>>         at 
>>>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>>>         at 
>>>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>>>         at 
>>>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>>>         at 
>>>> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
>>>>         at 
>>>> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)
>>>> 
>>>> {stacktrace}
>>>> 
>>>> -- 
>>>> 
>>>> JC 
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> 
>>>> JC 
>>>> 
>> 
>> 
>> -- 
>> 
>> JC 
>> 
> 
> 
> -- 
> 
> JC 
> 

Reply via email to