GCS numShards doubt
hi folks, I have this in code *globalIndexJson.apply("GCSOutput", TextIO.write().to(fullGCSPath).withSuffix(".txt").withNumShards(500));* the same code is executed for 50GB, 3TB, 5TB of data. I want to know if changing numShards for larger datasize will write to GCS faster?
Re: pipeline executed twice, unable to figure out the reason
I will try to add more details to make the problem more explanatory: - the whole pipeline is executed twice as i see it. There are 2 calls to ES to create an index, thats how I m verifying that pipeline is being run twice. - the log file is converted to txt format easy to open in browser only. >From the log file i figured out that 2 calls are being made to ES. The structure of the code is smth like this as shown in attached code.java The way I run the pipeline is in the previous thread. Lmk if there is some doubt in understanding the problem. On Sun, Feb 9, 2020 at 5:05 PM vivek chaurasiya wrote: > Hey team, > > My beam pipeline seems to be executing twice. The business logic of beam > pipeline is to create one ElasticSearch Index. But since its executed twice > the "spark-submit" command always fails and fails my automation. > > Attached is the logs. > > I am running spark-submit on AWS-EMR like this: > > spark-submit --deploy-mode cluster --conf > spark.executor.extraJavaOptions=-DCLOUD_PLATFORM=AWS --conf > spark.driver.extraJavaOptions=-DCLOUD_PLATFORM=AWS --conf > spark.yarn.am.waitTime=300s --conf > spark.executor.extraClassPath=__app__.jar --driver-memory 8G > --num-executors 5 --executor-memory 20G --executor-cores 6 --jars > s3://vivek-tests/cloud-dataflow-1.0.jar --name > new_user_index_mappings_create_dev --class > com.noka.beam.common.pipeline.EMRSparkStartPipeline > s3://vivek-tests/cloud-dataflow-1.0.jar > --job=new-user-index-mappings-create --dateTime=2020-02-04T00:00:00 > --isDev=True --incrementalExport=False > > Note: The code has been working as expected (i.e. one run of create-index) > on AWS EMR 5.17 but recently we upgraded to AWS-EMR-5.29 > > Does someone know if something changed in framework or am I doing smth > wrong? Please help! > > Thanks > Vivek > Container: container_1581290593006_0004_01_01 on ip-172-31-10-196.ec2.internal_8041 = LogType:stderr Log Upload Time:Mon Feb 10 00:19:13 + 2020 LogLength:20509 Log Contents: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/filecache/13/__spark_libs__4137318382159335040.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 20/02/10 00:18:59 INFO SignalUtils: Registered signal handler for TERM 20/02/10 00:18:59 INFO SignalUtils: Registered signal handler for HUP 20/02/10 00:18:59 INFO SignalUtils: Registered signal handler for INT 20/02/10 00:18:59 INFO SecurityManager: Changing view acls to: yarn,hadoop 20/02/10 00:18:59 INFO SecurityManager: Changing modify acls to: yarn,hadoop 20/02/10 00:18:59 INFO SecurityManager: Changing view acls groups to: 20/02/10 00:18:59 INFO SecurityManager: Changing modify acls groups to: 20/02/10 00:18:59 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set() 20/02/10 00:19:00 INFO ApplicationMaster: Preparing Local resources 20/02/10 00:19:01 INFO ApplicationMaster: ApplicationAttemptId: appattempt_1581290593006_0004_01 20/02/10 00:19:01 INFO ApplicationMaster: Starting the user application in a separate Thread 20/02/10 00:19:01 INFO ApplicationMaster: Waiting for spark context initialization... 20/02/10 00:19:02 INFO SparkRunner: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 1 files. Enable logging at DEBUG level to see which files will be staged. 20/02/10 00:19:03 INFO NewUserESIndexMappingsCreateTask: esIndexName = new_users_index_20200204 20/02/10 00:19:03 INFO log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog 20/02/10 00:19:03 INFO log: requestBody = { "aliases": null, "mappings": { "_source": { "excludes": ["j"] }, "properties": { "a1": { "type": "keyword" }, "a30": { "type": "keyword" }, "a": { "type": "keyword" }, "c": { "type": "keyword" }, "j": { "type": "text", "index_options": "docs", "analyzer": "list", "eager_global_ordinals&
pipeline executed twice, unable to figure out the reason
Hey team, My beam pipeline seems to be executing twice. The business logic of beam pipeline is to create one ElasticSearch Index. But since its executed twice the "spark-submit" command always fails and fails my automation. Attached is the logs. I am running spark-submit on AWS-EMR like this: spark-submit --deploy-mode cluster --conf spark.executor.extraJavaOptions=-DCLOUD_PLATFORM=AWS --conf spark.driver.extraJavaOptions=-DCLOUD_PLATFORM=AWS --conf spark.yarn.am.waitTime=300s --conf spark.executor.extraClassPath=__app__.jar --driver-memory 8G --num-executors 5 --executor-memory 20G --executor-cores 6 --jars s3://vivek-tests/cloud-dataflow-1.0.jar --name new_user_index_mappings_create_dev --class com.noka.beam.common.pipeline.EMRSparkStartPipeline s3://vivek-tests/cloud-dataflow-1.0.jar --job=new-user-index-mappings-create --dateTime=2020-02-04T00:00:00 --isDev=True --incrementalExport=False Note: The code has been working as expected (i.e. one run of create-index) on AWS EMR 5.17 but recently we upgraded to AWS-EMR-5.29 Does someone know if something changed in framework or am I doing smth wrong? Please help! Thanks Vivek application_1581290593006_0004.log Description: Binary data
Re: Problem with updating beam SDK
Can someone comment here? On Fri, Feb 7, 2020, 11:52 PM vivek chaurasiya wrote: > Hi team, > > We had beam SDKs 2.5 running on AWS-EMR Spark distribution 5.17. > > Essentially our beam code was just reading bunch of files from GCS and > pushing to ElasticSearch in AWS using beam's class ElasticSearchIO ( > https://beam.apache.org/releases/javadoc/2.0.0/index.html?org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.html). > So there is just a Map step, no reduce/groupby/etc. in the beam code. > > Basically my code is doing: > PCollection coll = // read GCS > coll.apply (ElasticSearchIO.write()) > > We submit spark command using 'spark-submit' > spark-submit --deploy-mode cluster --conf > spark.executor.extraJavaOptions=-DCLOUD_PLATFORM=AWS --conf > spark.driver.extraJavaOptions=-DCLOUD_PLATFORM=AWS --conf > spark.yarn.am.waitTime=300s --conf > spark.executor.extraClassPath=__app__.jar --driver-memory 8G > --num-executors 5 --executor-memory 20G --executor-cores 8 --jars > s3://snap-search-spark/cloud-dataflow-1.0.jar --class > com.snapchat.beam.common.pipeline.EMRSparkStartPipeline > s3://snap-search-spark/cloud-dataflow-1.0.jar --job=fgi-export > --isSolr=false --dateTime=2020-01-31T00:00:00 --isDev=true > --incrementalExport=false > > The dump to ES was finishing in max 1hour. > > This week we upgraded beam SDKs to 2.18 and running on AWS-EMR Spark > distribution 5.17. We observe that the export process becomes really slow > like 9 hours. The GCS filesize ~ 50gb (500 files of 100 mb each). > > I am new to SparkUI and AWS EMR, but still i tried to see why this > slowness is happening. Few observations: > > 1) some executors died got SIGTERM. Then i tried this: > https://dev.sobeslavsky.net/apache-spark-sigterm-mystery-with-dynamic-allocation/ > NO luck > > 2) I will try upgrading AWS-EMR Spark distribution 5.29 but will have to > test it. > > Anyone seen similar issues in past? Some suggestions will be highly > appreciated. > > Thanks > Vivek > > >
Problem with updating beam SDK
Hi team, We had beam SDKs 2.5 running on AWS-EMR Spark distribution 5.17. Essentially our beam code was just reading bunch of files from GCS and pushing to ElasticSearch in AWS using beam's class ElasticSearchIO ( https://beam.apache.org/releases/javadoc/2.0.0/index.html?org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.html). So there is just a Map step, no reduce/groupby/etc. in the beam code. Basically my code is doing: PCollection coll = // read GCS coll.apply (ElasticSearchIO.write()) We submit spark command using 'spark-submit' spark-submit --deploy-mode cluster --conf spark.executor.extraJavaOptions=-DCLOUD_PLATFORM=AWS --conf spark.driver.extraJavaOptions=-DCLOUD_PLATFORM=AWS --conf spark.yarn.am.waitTime=300s --conf spark.executor.extraClassPath=__app__.jar --driver-memory 8G --num-executors 5 --executor-memory 20G --executor-cores 8 --jars s3://snap-search-spark/cloud-dataflow-1.0.jar --class com.snapchat.beam.common.pipeline.EMRSparkStartPipeline s3://snap-search-spark/cloud-dataflow-1.0.jar --job=fgi-export --isSolr=false --dateTime=2020-01-31T00:00:00 --isDev=true --incrementalExport=false The dump to ES was finishing in max 1hour. This week we upgraded beam SDKs to 2.18 and running on AWS-EMR Spark distribution 5.17. We observe that the export process becomes really slow like 9 hours. The GCS filesize ~ 50gb (500 files of 100 mb each). I am new to SparkUI and AWS EMR, but still i tried to see why this slowness is happening. Few observations: 1) some executors died got SIGTERM. Then i tried this: https://dev.sobeslavsky.net/apache-spark-sigterm-mystery-with-dynamic-allocation/ NO luck 2) I will try upgrading AWS-EMR Spark distribution 5.29 but will have to test it. Anyone seen similar issues in past? Some suggestions will be highly appreciated. Thanks Vivek