If you want to explore how to parallelize the data ingestion into Elasticsearch, please have a look to this post I have written for Amazon AWS:
Cascading Source Code is available here.
If you want to explore how to parallelize the data ingestion into Elasticsearch, please have a look to this post I have written for Amazon AWS:
Cascading Source Code is available here.
Ganglia is a scalable distributed monitoring system for high-performance computing systems such as clusters and Grids. It is based on a hierarchical design targeted at federations of clusters. It leverages widely used technologies such as XML for data representation, XDR for compact, portable data transport, and RRDtool for data storage and visualization.
1. Ganglia Monitoring Daemon (gmond)
Gmond stands for ganglia monitoring daemon. It is a lightweight service that is installed on every machine you’d like to monitor.
Gmond has four main responsibilities:
1.1 Monitor changes in host state.
1.2 Announce relevant changes.
1.3 Listen to the state of all other ganglia nodes via a unicast or multicast channel.
1.4 Answer requests for an XML description of the cluster state.
Each gmond transmits information in two different ways:
a. Unicasting or Multicasting host state in external data representation (XDR) format using UDP messages.
b. Sending XML over a TCP connection.
Notes about gmond:
– The main configuration file of gmond is /etc/gmond.conf
– gmond is multithreaded
2. Ganglia Meta Daemon (gmetad)
The ganglia meta daemon (gmetad) is a service that collects data from other gmetad and gmond sources and stores their state to disk in indexed round-robin (RRD) databases. Gmetad provides a simple query mechanism for collecting specific information about groups of machines.
Notes about gmetad:
– the main configuration file for gmetad is /etc/gmetad.conf
– You need atleast one gmetad daemon installed node on each cluster.
– This gemetad daemon is the one who collects data send by gmond daemon.
– All other nodes other than the one in the cluster, do not require gmetad daemon to be installed.
– If you need the machine containing gmetad configured as node to be monitored, then in that case you need to install both gmond and gmetad on the machine.
3. Ganglia PHP Web Front-end
The Ganglia web front-end provides a view of the gathered information via real-time dynamic web pages. Most importantly, it displays Ganglia data in a meaningful way for system administrators and computer users using PHP.
In this picture we can see gmond installed in each node and sending data to gmetad installed in a “gmetad node”. We can have one or more “nodes with gmetad” in a cluster.
gmetad collects all the data from gmond and stores it in rrdtool database. Which is then collected by the php scripts, and showed as the first picture in this article.
The ganglia metric tool is a commandline application that you can use to inject custom made metrics about hosts that are being monitored by ganglia. It has the ability to spoof messages as coming from a different host in case you want to capture and report metrics from a device where you don’t have gmond running (like a network or other embedded device).
The ganglia stat tool is a commandline application that you can use to query a gmond for metric information directly.
6. RRD tool:
Ganglia uses RRD tool to store its data and visualization.
RRD tool is the short form for Round Robin Data base tool. This is a wonderful and useful open source data base tool. In this RRD stores data in time-series. For example RRD tool will store all values of CPU load at a certain time interval and then graph these data according to time.
1) Build the package with the provided pom.xml:
$ mvn package
2) Rebuild the RPM structure:
$ mvn -DskipTests=true rpm:rpm
A structure like the following will be created:
/target/rpm/<app_name>/BUILD /target/rpm/<app_name>/RPMS /target/rpm/<app_name>/SOURCES /target/rpm/<app_name>/SPECS /target/rpm/<app_name>/SRPMS
1) Connect to HBase. Connect to your running instance of HBase using the hbase shell command, located in the bin/ directory of your HBase install.
$ ./bin/hbase shell hbase(main):001:0>
2) Create a table. Use the create command to create a new table. You must specify the table name and the ColumnFamily name.
hbase(main):001:0> create 'test', 'cf' 0 row(s) in 0.4170 seconds => Hbase::Table - test
3) List Information About your Table.
hbase(main):002:0> list 'test'
TABLE test 1 row(s) in 0.0180 seconds => ["test"]
4) Put data into your table. Here, we insert three values, one at a time. The first insert is at row1, column cf:a, with a value of value1. Columns in HBase are comprised of a column family prefix, cf in this example, followed by a colon and then a column qualifier suffix, a in this case.
hbase(main):003:0> put 'test', 'row1', 'cf:a', 'value1' 0 row(s) in 0.0850 seconds hbase(main):004:0> put 'test', 'row2', 'cf:b', 'value2' 0 row(s) in 0.0110 seconds hbase(main):005:0> put 'test', 'row3', 'cf:c', 'value3' 0 row(s) in 0.0100 seconds
5) Scan the table for all data at once. One of the ways to get data from HBase is to scan. Use the scan command to scan the table for data. You can limit your scan, but for now, all data is fetched.
hbase(main):006:0> scan 'test' ROW COLUMN+CELL row1 column=cf:a, timestamp=1421762485768, value=value1 row2 column=cf:b, timestamp=1421762491785, value=value2 row3 column=cf:c, timestamp=1421762496210, value=value3 3 row(s) in 0.0230 seconds
6) Get a single row of data.
hbase(main):007:0> get 'test', 'row1' COLUMN CELL cf:a timestamp=1421762485768, value=value1 1 row(s) in 0.0350 seconds
7) Disable a table. If you want to delete a table or change its settings, as well as in some other situations, you need to disable the table first, using the disable command. You can re-enable it using the enable command.
hbase(main):008:0> disable 'test' 0 row(s) in 1.1820 seconds hbase(main):009:0> enable 'test' 0 row(s) in 0.1770 seconds
8) Disable the table again if you tested the enable command above:
hbase(main):010:0> disable 'test' 0 row(s) in 1.1820 seconds
9) Drop the table.
hbase(main):011:0> drop 'test' 0 row(s) in 0.1370 seconds
10) Backup and restore to S3:
hadoop jar /home/hadoop/lib/hbase.jar emr.hbase.backup.Main --backup --backup-dir s3://your-bucket/backups/j-XXXX
hadoop jar /home/hadoop/lib/hbase.jar emr.hbase.backup.Main --restore --backup-dir s3://your-bucket/backup-hbase/j-XXXX'
hbase org.apache.hadoop.hbase.mapreduce.Import test s3n://your-bucket/backup-hbase/j-XXXX
11) Backup and Restore with Distcp and S3distCp:
11.1) Using Distcp method to backup to S3:
hadoop distcp hdfs://ec2-52-16-22-167.eu-west-1.compute.amazonaws.com:9000/hbase/ s3://your-bucket/hbase/201502280715/
11.2) Using Distcp to backup to another cluster:
hadoop distcp hdfs://ec2-52-16-22-167.eu-west-1.compute.amazonaws.com:9000/hbase/ hdfs://ec2-54-86-229-249.compute-1.amazonaws.comec2-2:9000/hbase/
11.3) Using S3distcp method to backup to S3:
hadoop jar ~/lib/emr-s3distcp-1.0.jar --src hdfs:///hbase/ --dest s3://your-bucket/hbase/201502280747/
Handling JSON files with Hive is not always an easy task.
If you need to extract some specific fields from a structured JSON, we have some alternatives:
There are two UDF functions that are usually helpful on this cases: ‘get_json_object’ and ‘json_tuple’. These functions allows you to access json fields from Hive without installing additional libraries.
To navigate JSON structure with get_json_object, the entire JSON file has to be mapped as string.
1) Create the external table as string:
CREATE EXTERNAL TABLE json_table (str string) LOCATION 's3://mybucket/input/jsonserde' ;
2) select field from store.fruit field:
select get_json_object(json_table.str, '$.store.fruit\') as MyField from json_table;
If you need to add Elasticsearch and Kibana on EMR, please have a look to this post I have written for Amazon AWS:
It contains all the steps to launch a cluster and perform the basic testings on both tools.
Additionally, here you will find the source code for the bootstrap actions used to configure Elasticsearch and Kibana on the EMR Hadoop cluster:
Si necesitas Elasticsearch y Kibana instalado en un cluster EMR, por favor, mira esta publicacion que he escrito para Amazon AWS:
Contiene todos los pasos para crear un cluster y realizar las pruebas basicas en las dos herramientas.
Adicionalmente, aqui encontraras el codigo fuente para las bootstrap actions que uso para instalar Elasticsearch y Kibana en el EMR Hadoop cluster.
The challenge that architects and developers face today is how to process large volumes of data in a timely, cost effective, and reliable manner. There are several NoSQL solutions in the market today, and choosing the right one for your use case can be difficult.
Both Amazon DynamoDB and Apache HBase are available in the Amazon Web Services (AWS) cloud.
Amazon DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability. Amazon DynamoDB offers the following benefits:
– No administrative overhead—Amazon DynamoDB manages the burdens of hardware provisioning, setup and configuration, replication, cluster scaling, hardware and software updates, and monitoring and handling of hardware failures.
– Virtually unlimited throughput and scale—The provisioned throughput model of Amazon DynamoDB allows you to specify throughput capacity to serve nearly any level of request traffic. With Amazon DynamoDB, there is virtually no limit to the amount of data that can be stored and retrieved.
– Elasticity and flexibility: —Amazon DynamoDB can handle unpredictable workloads with predictable performance and still maintain a stable latency profile that shows no latency increase or throughput decrease as the data volume rises with increased
– Integration with other AWS services: DynamoDB integrates seamlessly with other AWS services such as Amazon Identity and Access Management (Amazon IAM) to control access to DynamoDB resources, CloudWatch to monitor a variety of DynamoDB performance metrics, Amazon Kinesis for real-time data ingestion, S3 Storage Service for Internet storage, Amazon EMR to provide enhanced advanced analytics capabilities, Amazon Redshift to provide business intelligence capabilities, and AWS Data Pipeline to automate data-driven workflows.
Apache HBase, a Hadoop database, offers the following benefits:
– Efficient storage of sparse data—Apache HBase provides fault-tolerant storage for large quantities of sparse data using column-based compression. Apache HBase is capable of storing and processing billions of rows and millions of columns per row.
– Store for high frequency counters—Apache HBase is suitable for tasks such as high-speed counter aggregation because of its consistent reads and writes.
– High write throughput and update rates—Apache HBase supports low latency lookups and range scans, efficient updates and deletions of individual records, and high write throughput.
– Support for multiple Hadoop jobs—The Apache HBase data store allows data to be used by one or more Hadoop jobs on a single cluster or across multiple Hadoop clusters.
DynamoDB uses a provisioned throughput model to process data. During table creation time, it automatically partitions and reserves the appropriate amount of resources to meet your specified throughput requirements.
To decide on the required read and write throughput values for a table, consider the following factors:
– Item size: The read and write capacity units that you specify are based on a predefined data item size per read or per write operation.
– Expected read and write request rates: You must also determine the expected number of read and write operations your application will perform against the table, per second.
– Consistency: Whether your application requires strongly consistent or eventually consistent reads is a factor in determining how many read capacity units you need to provision for your table.
– Local secondary indexes: Queries against indexes consume provisioned read throughput.
– Global secondary indexes: The provisioned throughput settings of a global secondary index are separate from those of its parent table. Therefore, the expected workload on the global secondary index also needs to be taken into consideration when specifying the read and write capacity at index creation time.
Although read and write requirements are specified at table creation time, DynamoDB lets you increase or decrease the provisioned throughput to accommodate load with no downtime.
In HBase, the number of nodes in a cluster can be driven by the required throughput for reads and/or writes. The available throughput on a given node can vary depending on the data, specifically:
– Key/value sizes
– Data access patterns
– Cache hit rates
– Node and system configuration
You should plan for peak load if load will likely be the primary factor that increases node count within a HBase cluster.
Eventual consistency option is the default in DynamoDB and maximizes the read throughput. But, it lets you specify the desired consistency for each read request within an application. You can specify whether a read is eventually consistent or strongly consistent.
Please note: An eventually consistent read might not always reflect the results of a recently completed write. Consistency across all copies of data is usually reached within a second.
A strongly consistent read in DynamoDB returns a result that reflects all writes that received a successful response prior to the read.
HBase reads and writes are strongly consistent. This means that all reads and writes to a single row in Apache HBase are atomic. Each concurrent reader and writer can make safe assumptions about the state of a row. Multi-versioning and time stamping in Apache HBase contribute to its strongly consistent model.
In summary, DynamoDB and HBase have similar data processing models in that they both support only atomic single-row transactions. Both databases also provide batch operations for bulk data processing across multiple rows and tables.
One key difference between the two databases is the flexible provisioned throughput model of Amazon DynamoDB. The ability to dial up capacity when you need it and dial it back down when you are done is useful for processing variable workloads with unpredictable peaks.
For workloads that need high update rates to perform data aggregations or maintain counters, Apache HBase is a good choice. This is because Apache HBase supports a multi-version concurrency control mechanism, which contributes to its strongly consistent reads and writes. Amazon DynamoDB gives you the flexibility to specify whether you want your read request to be eventually consistent or strongly consistent depending on your specific workload.
DynamoDB stores three geographically distributed replicas of each table to enable high availability and data durability within a region.
Data is auto-partitioned primarily using the hash key: as throughput and data size increase, it will automatically repartition and reallocate data across more nodes.
Partitions in DynamoDB are fully independent, resulting in a shared nothing cluster. However, provisioned throughput is divided evenly across the partitions.
A region is the basic unit of scalability and load balancing in HBase. Region splitting and subsequent load-balancing follow this sequence of events:
1. Initially there is only one region for a table, and as more data is added to it, the system monitors the load to ensure that the configured maximum size is not exceeded.
2. If the region size exceeds the configured limit, the system dynamically splits the region into two at the row key in the middle of the region, creating two roughly equal halves.
3. The master then schedules the new regions to be moved off to other servers for load balancing, if required.
Behind the scenes, Zookeeper tracks all activities that take place during a region split and maintains the state of the region in case of server failure.
HBase regions are equivalent to range partitions that are used in RDBMS sharding. Regions can be spread across many physical servers that consequently distribute the load, resulting in scalability.
In HBase, the most basic unit is a column. One or more columns form a row. Each row is addressed uniquely by a primary key referred to as a row key. A row in HBase can have millions of columns. Each column can have multiple versions with each distinct value contained in a separate cell.
One fundamental modeling concept in Apache HBase is that of a column family. A column family is a container for grouping sets of related data together within one table.
HBase groups columns with the same general access patterns and size characteristics into column families to form a basic unit of separation.
Column families allow you to fetch only those columns that are required by a query. All members of a column family are physically stored together on a disk. This means that optimization features, such as performance tunings, compression encodings, and so on, can be scoped at the column-family level.
For performance reasons, it is important to keep the number of column families in your Apache HBase schema low. Anything above three-column families can potentially degrade performance. The recommended best practice is to maintain a one-column family in your schemas and introduce a second-column family and third-column family only if data access is limited to a one-column family at a time.
Note that Apache HBase does not impose any restrictions on row size.
Memory is the most restrictive element in Apache HBase. Performance-tuning techniques are focused on optimizing memory consumption.
From a schema design perspective, it is important to bear in mind that every cell stores its value as fully qualified with its full row key, column family, column name, and timestamp on disk. If row and column names are long, the cell value coordinates might become very large and take up more of Apache HBase allotted memory. This can cause severe performance implications, especially if the dataset is large.
HBase supports built-in mechanisms to handle region splits and compactions. Split/compaction storms can occur when multiple regions grow at roughly the same rate, and eventually split at about the same time. This can cause a large spike in disk I/O because of the compactions needed to rewrite the split regions.
Rather than relying on Apache HBase to automatically split and compact the growing regions, you can perform these tasks manually. You can perform them in a time-controlled manner and stagger them across all regions to spread the I/O load as much as possible to avoid potential split/compaction storms. With the manual option, you can further alleviate any problematic split/compaction storms and gain additional performance.
A region can run hot when dealing with a write pattern that does not distribute load across all servers evenly. This is a common scenario when dealing with streams processing events with time series data. The gradually increasing nature of time series data can cause all incoming data to be written to the same region.
This concentrated write activity on a single server can slow down the overall performance of the cluster. This is because inserting data is now bound to the performance of a single machine. This problem is easily overcome by employing key design strategies such as the following:
– Applying salting prefixes to keys; in other words, prepending a random number to a row.
– Randomizing the key with a hash function.
– Promoting another field to prefix the row key.
These techniques can achieve a more evenly distributed load across all servers.
Client API Considerations:
There are a number of optimizations to take into consideration when reading or writing data from a client using the Apache HBase API. For example, when performing a large number of PUT operations, you can disable the auto-flush feature. Otherwise, the PUT operations will be sent one at a time to the region server.
Whenever you use a scan operation to process large numbers of rows, use filters to limit the scan scope. Using filters can potentially improve performance. This is because column over-selection can incur a nontrivial performance penalty, especially over large data sets.
As a recommended best practice, set the scanner-caching to a value greater than the default of 1, especially if Apache HBase serves as an input source for a MapReduce job.
Setting the scanner-caching value to 500, for example, will transfer 500 rows at a time to the client to be processed, but this might potentially cost more in memory consumption.
Data compression is an important consideration in Apache HBase production workloads. Apache HBase natively supports a number of compression algorithms that you can enable at the column family level.
Enabling compression yields better performance.
In general, compute resources for performing compression and decompression tasks are typically less than the overheard for reading more data from disk.
HBase is typically deployed on top of the Hadoop Distributed File System (HDFS), which provides a scalable, persistent, storage layer.
Apache ZooKeeper is a critical component for maintaining configuration information and managing the entire Apache HBase cluster.
The three major Apache HBase components are the following:
– Client API
– Master server
– Region servers
HBase stores data in indexed store files called HFiles on HDFS. The store files are sequences of blocks with a block index stored at the end for fast lookups.
The store files provide an API to access specific values as well as to scan ranges of values, given a start and end key.
During a write operation, data is first written to a commit log called a write-ahead-log (WAL) and then moved into memory in a structure called Memstore.
When the size of the Memstore exceeds a given maximum value, it is flushed as an HFile to disk. Each time data is flushed from Memstores to disk, new HFiles must be created. As the number of HFiles builds up, a compaction process merges the files into fewer, larger files.
A read operation essentially is a merge of data stored in the Memstores and in the HFiles.
The WAL is never used in the read operation. It is meant only for recovery purposes if a server crashes before writing the in-memory data to disk.
A region in Apache HBase acts as a store per column family. Each region contains contiguous ranges of rows stored together.
Regions can be merged to reduce the number of store files. A large store file that exceeds the configured maximum store file size can trigger a region split.
A region server can serve multiple regions. Each region is mapped to exactly one region server. Region servers handle reads and writes, as well as keep data in-memory until enough is collected to warrant a flush.
Clients communicate directly with region servers to handle all data-related operations.
The master server is responsible for monitoring and assigning regions to region servers and uses Apache ZooKeeper to facilitate this task.
Apache ZooKeeper also serves as a registry for region servers and a bootstrap location for region discovery.
The master server is also responsible for handling critical functions such as load balancing of regions across region servers, region server failover, and completing region splits, but it is not part of the actual data storage or retrieval path.