get the driver’s IP in spark yarn-cluster mode


In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.

Sometimes we will have a bunch of logs for a terminated cluster and we need to find out which node was the driver in cluster mode.

Searching for “driverUrl” on the application/container logs, we will find it

find . -iname "*.gz" | xargs zgrep "driverUrl"
./container_1459071485818_0006_02_000001/stderr.gz:15/03/28 05:10:47 INFO YarnAllocator: Launching ExecutorRunnable. driverUrl: spark://CoarseGrainedScheduler@172.31.16.15:47452,  executorHostname: ip-172-31-16-13.ec2.internal
...
./container_1459071485818_0006_02_000001/stderr.gz:15/03/28 05:10:47 INFO YarnAllocator: Launching ExecutorRunnable. driverUrl: spark://CoarseGrainedScheduler@172.31.16.15:47452,  executorHostname: ip-172-31-16-14.ec2.internal

On this case the driver was running on 172.31.16.15.

Publicado en Uncategorized | Etiquetado , | Deja un comentario

Consider boosting spark.yarn.executor.memoryOverhead


This is a very specific error related to the Spark Executor and the YARN container coexistence.

You will typically see errors like this one on the application container logs:

15/03/12 18:53:46 WARN YarnAllocator: Container killed by YARN for exceeding memory limits. 9.3 GB of 9.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
15/03/12 18:53:46 ERROR YarnClusterScheduler: Lost executor 21 on ip-xxx-xx-xx-xx: Container killed by YARN for exceeding memory limits. 9.3 GB of 9.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

To overcome this, you need to keep in mind how Yarn container and the executor are set in memory:

spark-tuning-yarn-memory

Memory used by Spark Executor is exceeding the predefined limits (often caused by a few spikes) and that is causing YARN to kill the container with the previously mentioned message error.

By default ‘spark.yarn.executor.memoryOverhead’ parameter is set to 384 MB. This value could be low depending on your application and the data load.

Suggested value for this parameter is ‘executorMemory * 0.10’.

We can increase the value for ‘spark.yarn.executor.memoryOverhead’ to 1GB on spark-submit bu adding this to the command line:

–conf spark.yarn.executor.memoryOverhead=1024

For reference, this fix was added on Jira 1930:

+  <td><code>spark.yarn.executor.memoryOverhead</code></td>

 

Publicado en Uncategorized | Etiquetado , , | Deja un comentario

Compile Scala program with sbt


Install sbt:

curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
sudo yum install sbt

Compile & Build

Place build.sbt and the .scala program in the same directory and run:

sbt package

 

 

Publicado en Uncategorized | Etiquetado | Deja un comentario

Running Spark with oozie


Oozie 4.2 now supports spark-action.

Example job.properties file (configuration tested on EMR 4.2.0):

nameNode=hdfs://172.31.25.17:8020 
jobTracker=172.31.25.17:8032 
master=local[*] 
queueName=default 
examplesRoot=examples 
oozie.use.system.libpath=true 
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/spark

(Use the master node internal IP instead of localhost in the nameNode and jobTracker)

Validate oozie workflow xml file:

oozie validate workflow.xml

Example workflow.xml file:

<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkFileCopy'>
 <start to='spark-node' />
<action name='spark-node'>
 <spark xmlns="uri:oozie:spark-action:0.1">
 <job-tracker></job-tracker>
 <name-node></name-node>
 <prepare>
 <delete path="/user/${wf:user()}//output-data/spark"/>
 </prepare>
 <master></master>
 <name>Spark-FileCopy</name>
 <class>org.apache.oozie.example.SparkFileCopy</class>
 <jar>/user/${wf:user()}//apps/spark/lib/oozie-examples.jar</jar>
 <arg>/user/${wf:user()}//input-data/text/data.txt</arg>
 <arg>/user/${wf:user()}//output-data/spark</arg>
 </spark>
 <ok to="end" />
 <error to="fail" />
 </action>
<kill name="fail">
 <message>Workflow failed, error
 message[${wf:errorMessage(wf:lastErrorNode())}]
 </message>
 </kill>
 <end name='end' />
 </workflow-app>

Create the defined structure in HDFS and copy the proper files:

hadoop fs -ls /user/hadoop/examples/apps/spark/
Found 3 items
drwxr-xr-x - hadoop hadoop 0 2015-12-18 08:13 /user/hadoop/examples/apps/spark/lib
-rw-r--r-- 1 hadoop hadoop 1920 2015-12-18 08:08 /user/hadoop/examples/apps/spark/workflow.xml

hadoop fs -put workflow.xml /user/hadoop/examples/apps/spark/

hadoop fs -put /usr/share/doc/oozie-4.2.0/examples/apps/spark/lib/oozie-examples.jar /user/hadoop/examples/apps/spark/lib

hadoop fs -mkdir -p /user/hadoop/examples/input-data/text

hadoop fs -mkdir -p /user/hadoop/examples/output-data/spark

hadoop fs -put /usr/share/doc/oozie-4.2.0/examples/input-data/text/data.txt /user/hadoop/examples/input-data/text/

 

Run your oozie Job:

oozie job --oozie http://localhost:11000/oozie -config ./job.properties -run

Check oozie job:

oozie job -info 0000004-151203092421374-oozie-oozi-W

Check available sharelib:

$ oozie admin -shareliblist -oozie http://localhost:11000/oozie
[Available ShareLib] 
oozie 
hive 
distcp 
hcatalog 
sqoop 
mapreduce-streaming 
spark 
hive2 
pig

 

 

References:

https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html

 

Publicado en Uncategorized | Etiquetado , , | Deja un comentario

Hadoop: Output Commiter Notes


OutputCommitter describes the commit of task output for a MapReduce job.

The MapReduce framework relies on the OutputCommitter of the job to:

  • Set up the job during initialization; for example, create the temporary output directory for the job. Job setup is done by a separate task when the job is in PREP state and after initializing tasks. Once the setup task completes, the job is moved to the RUNNING state.
  • Clean up the job after job completion; for example, remove the temporary output directory. Job cleanup is done by a separate task at the end of the job. The job is declared SUCCEEDED, FAILED, or KILLED after the cleanup task completes.
  • Set up the task temporary output. Task setup is done as part of the same task, during task initialization.
  • Check whether a task needs a commit. This prevents unnecessary commit procedures.
  • Commit the task output. After the task is done, the task commits its output, if required.
  • Discard the task commit. If the task is failed or killed, the output is cleaned up. If the task could not clean up (in an exception block), a separate task is launched with the same attempt ID to do the cleanup.

FileOutputCommitter is the default OutputCommitter. Job setup/cleanup tasks occupy map or reduce containers, whichever is available on the NodeManager. The JobCleanup task, TaskCleanup tasks, and JobSetup task have the highest priority, in that order.

Task Side-Effect Files

In some applications, component tasks need to create or write to side files, which differ from the actual job output files.

In such cases, two instances of the same Mapper or Reducer could be running simultaneously (for example, speculative tasks), trying to open or write to the same file on the file system. You must pick unique names per task-attempt (using the attemptid, say attempt_200709221812_0001_m_000000_0), not just per task.

To avoid these issues, the MapReduce framework, when the OutputCommitter is FileOutputCommitter, maintains a special ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} subdirectory accessible via ${mapreduce.task.output.dir} for each task attempt on the FileSystem where the output of the task attempt is stored.

On successful completion of the task attempt, the files in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) are promoted to ${mapreduce.output.fileoutputformat.outputdir}/. The framework discards the subdirectory of unsuccessful task attempts. This process is completely transparent to the application.

You can use this feature by creating any required side files during execution of a task in ${mapreduce.task.output.dir} via FileOutputFormat.getWorkOutputPath(). The framework promotes them similarly for successful task attempts. This eliminates the need to pick unique paths per task attempt.

Note: The value of ${mapreduce.task.output.dir} during execution of a particular task attempt is actually ${mapreduce.output.filetoutputformat.outputdir}/temporary/${taskid}; this value is set by the MapReduce framework. To use this feature, create files in the path returned by FileOutputFormat.getWorkOutputPath() from the MapReduce task.

The entire discussion holds true for maps of jobs with reducer=NONE (that is, 0 reduces) because output of the map, in that case, goes directly to HDFS.

References: http://www.cloudera.com/content/www/en-us/documentation/other/tutorial/CDH5/Hadoop-Tutorial/ht_job_output.html

Publicado en Uncategorized | Etiquetado , | Deja un comentario

Human Resources and Rocket Science / Recursos Humanos y la Coheteria


I haven’t managed too large teams in my life. But, being in the team, I’ve learned a simple concept:

Human Resources are not Rocket Science (action/reaction based). If you are not proactive while managing, you will loose the Resource.

 

Versión en Español

No he administrado grandes equipos de trabajo en my vida, pero he aprendido un concepto simple estando dentro del equipo:

Los Recursos Humanos no estan basados en accion y reaccion como en la coheteria. Si no eres proactivo al administrarlos, perderas el Recurso.

Publicado en Uncategorized | Etiquetado , | Deja un comentario

Puppet: Syntax validation for Hiera yaml files


I need this handy:

ruby -e "require 'yaml'; YAML.load_file('common.yaml')"
Publicado en Uncategorized | Etiquetado , , | Deja un comentario