Running Spark with oozie

Oozie 4.2 now supports spark-action.

Example file (configuration tested on EMR 4.2.0):


(Use the master node internal IP instead of localhost in the nameNode and jobTracker)

Validate oozie workflow xml file:

oozie validate workflow.xml

Example workflow.xml file:

<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkFileCopy'>
 <start to='spark-node' />
<action name='spark-node'>
 <spark xmlns="uri:oozie:spark-action:0.1">
 <delete path="/user/${wf:user()}//output-data/spark"/>
 <ok to="end" />
 <error to="fail" />
<kill name="fail">
 <message>Workflow failed, error
 <end name='end' />

Create the defined structure in HDFS and copy the proper files:

hadoop fs -ls /user/hadoop/examples/apps/spark/
Found 3 items
drwxr-xr-x - hadoop hadoop 0 2015-12-18 08:13 /user/hadoop/examples/apps/spark/lib
-rw-r--r-- 1 hadoop hadoop 1920 2015-12-18 08:08 /user/hadoop/examples/apps/spark/workflow.xml

hadoop fs -put workflow.xml /user/hadoop/examples/apps/spark/

hadoop fs -put /usr/share/doc/oozie-4.2.0/examples/apps/spark/lib/oozie-examples.jar /user/hadoop/examples/apps/spark/lib

hadoop fs -mkdir -p /user/hadoop/examples/input-data/text

hadoop fs -mkdir -p /user/hadoop/examples/output-data/spark

hadoop fs -put /usr/share/doc/oozie-4.2.0/examples/input-data/text/data.txt /user/hadoop/examples/input-data/text/


Run your oozie Job:

oozie job --oozie http://localhost:11000/oozie -config ./ -run

Check oozie job:

oozie job -info 0000004-151203092421374-oozie-oozi-W

Check available sharelib:

$ oozie admin -shareliblist -oozie http://localhost:11000/oozie
[Available ShareLib] 





Publicado en Uncategorized | Etiquetado , , | Deja un comentario

Hadoop: Output Commiter Notes

OutputCommitter describes the commit of task output for a MapReduce job.

The MapReduce framework relies on the OutputCommitter of the job to:

  • Set up the job during initialization; for example, create the temporary output directory for the job. Job setup is done by a separate task when the job is in PREP state and after initializing tasks. Once the setup task completes, the job is moved to the RUNNING state.
  • Clean up the job after job completion; for example, remove the temporary output directory. Job cleanup is done by a separate task at the end of the job. The job is declared SUCCEEDED, FAILED, or KILLED after the cleanup task completes.
  • Set up the task temporary output. Task setup is done as part of the same task, during task initialization.
  • Check whether a task needs a commit. This prevents unnecessary commit procedures.
  • Commit the task output. After the task is done, the task commits its output, if required.
  • Discard the task commit. If the task is failed or killed, the output is cleaned up. If the task could not clean up (in an exception block), a separate task is launched with the same attempt ID to do the cleanup.

FileOutputCommitter is the default OutputCommitter. Job setup/cleanup tasks occupy map or reduce containers, whichever is available on the NodeManager. The JobCleanup task, TaskCleanup tasks, and JobSetup task have the highest priority, in that order.

Task Side-Effect Files

In some applications, component tasks need to create or write to side files, which differ from the actual job output files.

In such cases, two instances of the same Mapper or Reducer could be running simultaneously (for example, speculative tasks), trying to open or write to the same file on the file system. You must pick unique names per task-attempt (using the attemptid, say attempt_200709221812_0001_m_000000_0), not just per task.

To avoid these issues, the MapReduce framework, when the OutputCommitter is FileOutputCommitter, maintains a special ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} subdirectory accessible via ${mapreduce.task.output.dir} for each task attempt on the FileSystem where the output of the task attempt is stored.

On successful completion of the task attempt, the files in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) are promoted to ${mapreduce.output.fileoutputformat.outputdir}/. The framework discards the subdirectory of unsuccessful task attempts. This process is completely transparent to the application.

You can use this feature by creating any required side files during execution of a task in ${mapreduce.task.output.dir} via FileOutputFormat.getWorkOutputPath(). The framework promotes them similarly for successful task attempts. This eliminates the need to pick unique paths per task attempt.

Note: The value of ${mapreduce.task.output.dir} during execution of a particular task attempt is actually ${mapreduce.output.filetoutputformat.outputdir}/temporary/${taskid}; this value is set by the MapReduce framework. To use this feature, create files in the path returned by FileOutputFormat.getWorkOutputPath() from the MapReduce task.

The entire discussion holds true for maps of jobs with reducer=NONE (that is, 0 reduces) because output of the map, in that case, goes directly to HDFS.


Publicado en Uncategorized | Etiquetado , | Deja un comentario

Human Resources and Rocket Science / Recursos Humanos y la Coheteria

I haven’t managed too large teams in my life. But, being in the team, I’ve learned a simple concept:

Human Resources are not Rocket Science (action/reaction based). If you are not proactive while managing, you will loose the Resource.


Versión en Español

No he administrado grandes equipos de trabajo en my vida, pero he aprendido un concepto simple estando dentro del equipo:

Los Recursos Humanos no estan basados en accion y reaccion como en la coheteria. Si no eres proactivo al administrarlos, perderas el Recurso.

Publicado en Uncategorized | Etiquetado , | Deja un comentario

Puppet: Syntax validation for Hiera yaml files

I need this handy:

ruby -e "require 'yaml'; YAML.load_file('common.yaml')"
Publicado en Uncategorized | Etiquetado , , | Deja un comentario

Spark Notes

Apache Spark, is an open source cluster computing framework originally developed at University of California, Berkeley but was later donated to the Apache Software Foundation where it remains today. In contrast to Hadoop’s two-stage disk-based MapReduce paradigm, Spark’s multi-stage in-memory primitives provides performance up to 100 faster for certain applications.


Spark has a driver program where the application logic execution is started, with multiple workers which processing data in parallel.
The data is typically collocated with the worker and partitioned across the same set of machines within the cluster.  During the execution, the driver program will pass the code/closure into the worker machine where processing of corresponding partition of data will be conducted.
The data will undergoing different steps of transformation while staying in the same partition as much as possible (to avoid data shuffling across machines).  At the end of the execution, actions will be executed at the worker and result will be returned to the driver program.


Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel.


There are currently two types of RDDs:

  • parallelized collections, which take an existing Scala collection and run functions on it in parallel.
  • Hadoop datasets, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop.

Both types of RDDs can be operated on through the same methods. Each application has a driver process which coordinates its execution.

Running Applications:

Spark applications are similar to MapReduce “jobs”. Each application is a self-contained computation which runs some user-supplied code to compute a result. As with MapReduce jobs, Spark applications can make use of the resources of multiple nodes.

This process can run in the foreground (client mode) or in the background (cluster mode). Client mode is a little simpler, but cluster mode allows you to easily log out after starting a Spark application without terminating the application.

Spark starts executors to perform computations. There may be many executors, distributed across the cluster, depending on the size of the job. After loading some of the executors, Spark attempts to match tasks to executors.

Spark can run in two modes:

  • Standalone mode: Spark uses a Master daemon which coordinates the efforts of the Workers, which run the executors. Standalone mode is the default, but it cannot be used on secure clusters.
  • YARN mode: The YARN ResourceManager performs the functions of the Spark Master. The functions of the Workers are performed by the YARN NodeManager daemons, which run the executors. YARN mode is slightly more complex to set up, but it supports security, and provides better integration with YARN’s cluster-wide resource management policies.

Spark Execution Parameters

–num-executors: The –num-executors command-line flag or spark.executor.instances configuration property control the number of executors requested

–executor-cores: This property controls the number of concurrent tasks an executor can run. –executor-cores 5 means that each executor can run a maximum of five tasks at the same time.

Every Spark executor in an application has the same fixed number of cores and same fixed heap size.

The number of cores can be specified with the –executor-cores flag when invoking spark-submit, spark-shell, and pyspark from the command line.

The heap size can be controlled with the –executor-cores flag or the spark.executor.memory property.

–executor-memory: This property controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers.

The value of the spark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full memory request to YARN for each executor.

It defaults to max(384, .07 * spark.executor.memory).

Application Master:

Is a non-executor container with the special capability of requesting containers from YARN, takes up resources of its own that must be budgeted in. In yarn-client mode, it defaults to a 1024MB and one vcore. In yarn-cluster mode, the application master runs the driver, so it’s often useful to bolster its resources with the –driver-memory and –driver-cores properties.

Determining the number of executors:

It’s important to think about how the resources requested by Spark will fit into what YARN has available:
yarn.nodemanager.resource.memory-mb controls the maximum sum of memory used by the containers on each node.
yarn.nodemanager.resource.cpu-vcores controls the maximum sum of cores used by the containers on each node.


As an example, in a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory:
The NodeManager capacities, yarn.nodemanager.resource.memory-mb and yarn.nodemanager.resource.cpu-vcores, should probably be set to: 63 * 1024 = 64512 (megabytes) and 15 respectively

We must avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons.

In this case, we leave a gigabyte and a core for these system processes.

An executors configuration approach could be:
 –num-executors 17 –executor-cores 5 –executor-memory 19G.

This config results in 3 executors per node, except for the one with the AM, which will have 2 executors.
(executor-memory was derived as (63/3 executors per node) = 21.  21 * 0.07 = 1.47.  21 – 1.47 ~ 19)


  • Depending on the application size (memory), using small executors (several executors per node) will perform better.
  • If the executors are too tiny (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM.
  • The application master, which is a non-executor container with the special capability of requesting containers from YARN, takes up resources of its own that must be budgeted in. In yarn-client mode, it defaults to a 1024MB and one vcore. In yarn-cluster mode, the application master runs the driver, so it’s often useful to bolster its resources with the --driver-memory and --driver-cores properties.


  • In yarn-cluster mode, the application master runs the driver, so it’s useful to bolster its resources with the –driver-memory and –driver-cores properties.
  • Running executors with too much memory often results in excessive garbage collection delays.


Publicado en Uncategorized | Etiquetado , | Deja un comentario

How Apache Tez works

Tez enables developers to build end-user applications with much better performance and flexibility. It generalizes the MapReduce paradigm to a more powerful framework based on expressing computations as a dataflow graph.

The is designed to get around limitations imposed by MapReduce. Other than being limited to writing mappers and reducers, there are other inefficiencies in force-fitting all kinds of computations into this paradigm – for e.g. HDFS is used to store temporary data between multiple MR jobs, which is an overhead. In Hive, this is common when queries require multiple shuffles on keys without correlation, such as with join – grp by – window function – order by.


The Tez API has the following components:

DAG: this defines the overall job. The user creates a DAG object for each data processing job.
Vertex: this defines the user logic and the resources & environment needed to execute the user logic. The user creates a Vertex object for each step in the job and adds it to the DAG.
Edge: this defines the connection between producer and consumer vertices. The user creates an Edge object and connects the producer and consumer vertices using it.

It allows you, for e.g., instead of using multiple MapReduce jobs, you can use the MRR pattern, such that a single map has multiple reduce stages; this can allow streaming of data from one processor to another to another, without writing anything to HDFS (it will be written to disk only for check-pointing), leading to much better performance.


Re-use containers:
Tez follows the traditional Hadoop model of dividing a job into individual tasks, all of which are run as processes via YARN, on the users’ behalf. This model comes with inherent costs for process startup and initialization, handling stragglers and allocating each container via the YARN resource manager.


Publicado en Uncategorized | Etiquetado , | Deja un comentario

yarn: execute a script on all the nodes in the cluster

This is more Linux script related, but, sometimes we have a Hadoop (YARN) cluster running and we need to run a post install script or activity that executes on all the nodes in the cluster:

for i in `yarn node --list | cut -f 1 -d ':' | grep "ip"`; do ssh -i your-key.pem hadoop@$i 'hadoop fs -copyToLocal s3://mybucket/ | chmod +x /home/hadoop/ | /home/hadoop/' ; done

Note: we will need the your-key.pem file in the master node.

Publicado en Uncategorized | Etiquetado , , , , | Deja un comentario