Hadoop by itself allows you to store and process
very large volumes of data. However, building a large-scale distributed
system can require functionality not provided by this base. Several
other tools and systems have been created to fill the gaps and deliver
a more full-featured set of distributed systems engineering tools.
- Introduction
- Goals for this Module
- Outline
- ZooKeeper
- Motivation
- Data Storage in ZooKeeper
- ZooKeeper Applications
- Distributed Consensus
- Pig
- Motivation
- Pig Latin
- Pig Latin Data Types
- Loading Data Into Pig
- Pig Latin Operators
- Setting Up Pig
- References
Suppose you are building a large-scale
distributed system. Several different services need to be brought
online and must discover one another. It is not guaranteed that each
service will have a fixed master IP address. For example, it may be the
case that you start the same service on 100 nodes, and they elect a
master from among whichever of the 100 boots first. Each of these
disparate services must communicate with each other. How do all the
nodes of each service find the master IP address of each other service?
How do all the nodes in a single service agree on which one of them
becomes the master?
ZooKeeper is a service designed to handle
all of these problems. ZooKeeper will allow you to store small amounts
of information in a central location. It provides coordinated access to
this information. Most importantly, it provides high-availability: the
ZooKeeper service is intended to run on a set of several machines,
which prevents the loss of individual nodes from bringing down the
cluster. But these nodes communicate information in a careful way,
ensuring that all nodes in the ZooKeeper cluster provide the same
consistent answer for a query, regardless of which ZooKeeper server you
contact.
Several ZooKeeper daemons can be started on
different machines. Clients can connect to any daemon in the cluster;
the clients will always see the same view of the ZooKeeper world
regardless of which daemon they connect to. User data is stored in
objects with a hierarchical addressing system similar to that used by a
conventional file system. It has a root object named /, and
additional nodes can be extended off of this in a tree-like fashion.
Each node of the tree can both hold data (i.e., act like a file) and
have child nodes (i.e., act like a directory). The amount of data that
can be stored in an object is small: there is a hard cap of 1 MB. The
reason for the cap is so that the entire data store can be stored in
the RAM of the ZooKeeper machines. This allows requests to be
dispatched with high throughput. Changes are written to disk to provide
permanence, but read requests are entirely handled by the data cached
in memory. This is usually not a major limitation; the data stored at a
node is intended to be used as a pointer. For example, ZooKeeper may
know about the filename in another conventional file system, which
contains the authoritative configuration file for a distributed system.
The distributed system components first contact ZooKeeper to get the
definitive filename, and then fetch that file for the configuration.
ZooKeeper can be used for a variety of distributed
coordination tasks. In addition to leader election, system
bootstrapping, and various types of locks (mutual exclusion,
reader/writer, etc), other synchronization primitives such as barriers,
producer/consumer queues, priority queues, and multi-phase commit
operations can be encoded in ZooKeeper. The ZooKeeper tutorial
and recipes
pages describe how to implement these algorithms. ZooKeeper itself is
implemented in Java, but provides APIs for both Java- and C-based
programs.
ZooKeeper can also be used as a central message board for an
application. Individual nodes of a distributed system can store their
current operational status in ZooKeeper for easy central reporting. The
ZooKeeper service can also be used to form sub-groups of nodes or other
hierarchical arrangements within a distributed system.
As mentioned, data stored in ZooKeeper is accessed by manipulating
the nodes in the data hierarchy. This is done in a manner similar to
file system access. But ZooKeeper does not implement the POSIX file
system API. On the other hand, it also adds a set of other primitives
not ordinarily found in a file system. Nodes can be opened with a
number of special flags. One such flag is "ephemeral," meaning that the
node disappears when the client who opened it disconnects. Another such
flag is "sequence," which means that ZooKeeper will append a sequential
id number to the node name you are trying to create. These id numbers
are handed out in order, and the same id number is not reused.
ZooKeeper does not provide exclusive locks on nodes directly, but a
lock can be created by careful use of the ephemeral and sequence flags.
The ZooKeeper
recipes wiki page describes how to implement global locks using
these flags. It also describes protocols for implementing shared
(reader-writer) and revocable locks.
A reasonable question is how the ZooKeeper
service can function across multiple nodes and remain synchronized. If
distributed synchronization is why your services must use ZooKeeper,
how does ZooKeeper itself bootstrap this capability?
ZooKeeper implements a distributed consensus
protocol. ZooKeeper internally uses a leader election protocol such as
Paxos to determine which node in the ZooKeeper service is the master.
While clients connect to any node in the ZooKeeper service, these
additional nodes will forward agreed-upon facts back to
clients. Updates to the shared state require the intervention of the
master. All updates to the shared state are ordered with timestamps.
These timestamped updates are then disseminated to the nodes in the
ZooKeeper service. When a majority of nodes acknowledge an update, it
is said to be held by a quorum of the nodes. Any fact that a
quorum has agreed upon may be returned to clients. Conversely, any
updates that have not reached a quorum will not be returned to the
clients. The timestamps are used to order the updates to elements of
the data store. If multiple updates are made to the state of an
individual node, the newest update is used.
The use of a quorum ensures that the service
always returns consistent answers. Because a vote is effectively held
before returning a response, any nodes which hold stale data will be
outvoted by the nodes with more current information. This also makes
ZooKeeper resilient to failure. Up to 49% of the ZooKeeper service
nodes can shut down or become desynchronized before ZooKeeper loses its
ability to authoritatively answer responses. So if 11 nodes run
ZooKeeper, up to 5 of these may disconnect without incident. After more
than half the nodes fail, ZooKeeper will refuse service until the
machines are restored.
If the node of the ZooKeeper cluster which was
elected leader fails, then a new leader election will be held and the
cluster will continue to function.
The reason for electing a leader in such a
system is to ensure that timestamps assigned to updates are only issued
by a single authority. ZooKeeper is designed to reduce or eliminate
possible race conditions in distributed applications.
One consequence of ZooKeeper's design is that it
is intended to serve many more read requests than writes. A ZooKeeper
cluster can handle hundreds or thousands of clients, issuing many tens
of thousands of requests per second--if the majority of these requests
(90--99%) are reads. Only a small fraction should be updates.
The following code excerpt shows how to use
ZooKeeper to implement a "barrier." A barrier separates a process into
two logical halves. Multiple machines running in coordination with one
another will all perform the first half of the process. No machine can
begin the second half of the process until everyone has completed the
first half. The barrier sits between these processes. As nodes reach
the barrier, they all wait until everyone has reached the barrier. Then
all nodes are released to begin the second half. A distributed barrier
implementation written for ZooKeeper follows:
Watcher watcher = new Watcher() {
public void process(WatchEvent event) {}
};
ZooKeeper zk = new ZooKeeper(hosts, 3000, watcher);
Object notifyObject = new Object();
String root;
int size;
Barrier(ZooKeeper zk, String name, int size) throws KeeperException, InterruptedException {
this.zk = zk;
this.root = name;
this.size = size;
// Make sure the barrier node exists
try {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
} catch (NodeExistsException e) {}
}
b.enter()
/** work with everyone **/
b.leave()
/**
* Join barrier
* @return
* @throws KeeperException
* @throws InterruptedException */
boolean enter() throws KeeperException, InterruptedException {
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);
while (true) {
synchronized (notifyObject) {
ArrayList<String> list = zk.getChildren(root, new Watcher() {
public void process(Event e) { notifyObject.notifyAll(); }
});
if (list.size() < size) {
notifyObject.wait();
} else {
return true;
}
}
}
}
/**
* Wait until all reach barrier
* @return
* @throws KeeperException
* @throws InterruptedException */
boolean leave() throws KeeperException, InterruptedException {
zk.delete(root + "/" + name, 0);
while (true) {
synchronized (notifyObject) {
ArrayList<String> list = zk.getChildren(root, new Watcher() {
public void process(Event e) { notifyObject.notifyAll(); }
});
if (list.size() > 0) {
notifyObject.wait();
} else {
return true;
}
}
}
}
Listing 6.1: ZooKeeper Barrier Example
Pig is a platform for analyzing large data sets.
Pig's language, Pig
Latin, lets you specify a sequence of data transformations such as
merging data sets, filtering them, and applying functions to records or
groups of records. Users can create their own functions to do
special-purpose processing.
Pig Latin programs execute in a distributed
fashion on a cluster. Our
current implementation compiles Pig Latin programs into Map/Reduce
jobs,
and executes them using Hadoop on Kryptonite.
Thur purpose of Pig is to answer queries over
semi-structured data such as log files. Large volumes of data are in
mostly-organized formats such as log files, which define a set of
standard fields for each entry. While the MapReduce programming model
on top of Hadoop provides a convenient mechanism for delivering a large
volume of log-structured information to an analysis program, writing
analyses of mostly-structured information involves writing a large
amount of tedious processing code.
Pig is a high-level language for writing queries
over this sort of data. A query planner then compiles queries written
in this language (called "Pig Latin") into maps and reduces which are
then executed on a Hadoop cluster.
Anything which could be written in Pig can also
be implemented as straight Java-based Hadoop MapReduce. But while
individual programmers could develop their own suite of data analysis
functions such as join, order by, etc., this requires
individual programmers to develop their own (non-standard) libraries,
and test them. Pig provides a tested and supported suite of the most
common data-aggregation functions. It also allows programmers to
provide their own application-specific code for purposes of loading and
saving data, as well as for performing more complicated
record-by-record evaluations.
The programming language used to write Pig
queries is called Pig Latin.
The MapReduce programming model can be thought of
as composed of three distinct phases:
- Process input records
- Form groups of related records
- Process groups into outputs
In MapReduce, the first two of these steps are
handled by the mapper, and the third step is handled by the reducer.
Pig Latin exposes explicit primitives that perform actions from each
phase. These primitives can be composed and reordered. Furthermore, it
includes additional primitives to handle operations such as filtering
and joining data sets.
Values in Pig Latin can be expressed by four
basic data types:
All data types are fully nestable; bags may
contain tuples, and maps may contain bags or other maps, etc. This
differs from a traditional database model, where data must be
normalized into lists of atoms. By allowing data types to be composed
in this manner, Pig queries line up better to the conceptual model of
the data held by the programmer. Data types may also be heterogeneous.
For example, the fields of a tuple may each have different types; some
may be atoms, others may be more tuples, etc. The values in a bag may
hold different types, as may the values in data maps. These can vary
from one record to the next in the bag. Data map keys must be atoms,
for efficiency reasons.
The first step in using Pig is to load data into
a program. Pig provides a LOAD statement for this purpose.
Its format is: result = LOAD 'filename' USING fn()
AS (field1, field2, ...).
This statement returns a bag of values of all the
data contained in the named file. Each record in the bag is a tuple,
with the fields named by field1, field2, etc. The fn()
is a user-provided function that reads in the data. Pig supports
user-provided Java code throughout to handle the application-specific
bits of parsing. Pig Latin itself is the "glue" that then holds these
application-specific functions together, routing records and other data
between them.
An example data loading command (taken from this
paper on Pig) is:
queries = LOAD 'query_log.txt'
USING myLoad()
AS (userId, queryString, timestamp)
The user-defined functions to load data (e.g., myLoad()) do
not need to be provided. A default function for loading data exists,
which will parse tab-delimited records. If the programmer did not
specify field names in the AS clause, they would be addressed
by positional parameters: $0, $1, and so forth.
The default loader is called PigStorage(). This loader can
read files containing character-delimited tuple records. These tuples
must contain only atomic values; e.g., cat, turtle, fish.
Other loaders are listed in the PigBuiltins page of
the Pig wiki. PigStorage() takes as an argument the character
to use to delimit fields. For example, to load a table of three
tab-delimited fields, the following statement can be used:
data = LOAD 'tab_delim_data.txt' USING PigStorage('\t') AS (user, time, query)
A different argument could be passed to PigStorage() to
read comma- or space-delimited fields.
Pig Latin provides a number of operators which
filter, join, or otherwise organize data.
FOREACH: The FOREACH command
operates on each element of a data bag. This is useful, for instance,
for processing each input record in a bag returned by a LOAD
statement.
This statement iterates over the contents of a
bag. It applies the expressions on the right of the GENERATE
keyword to the data provided by the current record emitted from the
bag. The expressions may be, for example, the names of fields. So to
extract the names of all users who accessed the site (based on the query_log.txt
example shown above), we could write a query like:
In the FOREACH statement, each element
of the bag is considered independently. There are no expressions which
reference multiple elements being extracted from the bag's iterator at
a time; this allows the statement to be processed in parallel using
Hadoop MapReduce.
Expressions emitted by the GENERATE
element are not limited to the names of fields; they can be fields (by
name like userId or by position like $0),
constants, algebraic operations, map lookups, conditional expressions,
or FLATTEN expressions, described below.
Finally, these expressions may also call
user-provided functions that are written in Java. These user-provided
functions have access to the entire current record through a Pig
library; in this way, Pig can be used as the heavy-lifting component to
automate record-by-record mapping using an application-specific Java
function to perform tricky parsing or evaluation logic. Pig also
provides several of the most commonly-needed functions, such as COUNT,
AVG, MIN, MAX, and SUM.
FLATTEN is an expression which will
eliminate a level of nesting. Given a tuple which contains a bag, FLATTEN
will emit several tuples each of which contains one record from the
bag. For example, if we had a bag of records containing a person's name
and a list of types of pets they own:
A FLATTEN command would eliminate the
inner bags like so:
FILTER statements iterate over a bag and
return a new bag containing all elements which pass a conditional
expression, e.g.:
The COGROUP and JOIN operations
perform similar functions: they unite related data elements from
multiple data sets. The difference is that JOIN acts like the
SQL JOIN statement, creating a flat set of output records containing
the joined cross-product of the input records. The COGROUP
operator, on the other hand, groups the elements by their common field
and returns a set of records each containing two separate bags. The
first bag is the records of the first data set with the common field,
and the second bag is the records of the second data set containing the
common field.
To illustrate the difference, suppose we had the
flattened data set mapping people to their pets, and another flattened
data set mapping people to their friends. We could create a "pets of
friends" data set out of these like the following. Here are the input
data sets:
pets: (owner, pet)
----------------------
(Alice, turtle)
(Alice, goldfish)
(Alice, cat)
(Bob, dog)
(Bob, cat)
friends: (friend1, friend2)
----------------------
(Cindy, Alice)
(Mark, Alice)
(Paul, Bob)
Here is what is returned by COGROUP:
COGROUP pets BY owner, friends BY friend2; returns:
( Alice, {(Alice, turtle), (Alice, goldfish), (Alice, cat)},
{(Cindy, Alice), (Mark, Alice)} )
( Bob, {(Bob, dog), (Bob, cat)}, {(Paul, Bob)} )
Contrasted with the more familiar,
non-hierarchical JOIN operator:
JOIN pets BY owner, friends BY friend2; returns:
(Alice, turtle, Cindy)
(Alice, turtle, Mark)
(Alice, goldfish, Cindy)
(Alice, goldfish, Mark)
(Alice, cat, Cindy)
(Alice, cat, Mark)
(Bob, dog, Paul)
(Bob, cat, Paul)
In general, COGROUP command supports
grouping on as many data sets as are desired. Three or more data sets
can be joined in this fashion. It is also possible to group up elements
of only a single data set; this is supported through an alternate
keyword, GROUP.
A GROUP ... BY statement will organize
a bag of records into bags of related items based on the field
identified as their common key field. e.g., the pets bag from
the previous example could be grouped up with:
GROUP pets BY owner; returns:
( Alice, {(Alice, turtle), (Alice, goldfish), (Alice, cat)} )
( Bob, {(Bob, dog), (Bob, cat)} )
In this way, GROUP and FLATTEN
are effectively inverses of one another.
More complicated statements can be realized as
well: operations which expect a data set as input do not need to use an
explicitly-named data set; they can use one generated "inline" with
another FILTER, GROUP or other statement.
When the final data set has been created by a
Pig Latin script, the output can be saved to a file with the STORE
command, which follows the form:
The provided function specifies how to serialize
the data to the file; if it is omitted, then a default serializer will
write plain-text tab-delimited files.
A number of additional operators exist for the
purposes of removing duplicate records, sorting records, etc. This
paper explains the additional operators and expression syntaxes in
greater detail.
Pig is an Apache incubator project; it has not
made official packaged releases, but the source code can be retrieved
from their subversion
repository.
The Pig Incubator
home page contains instructions on retrieving the Pig sources and
compiling them.
Pig Tutorial: Included in this package
for user to get hands-on help
Olston, C., Reed, B., Srivastava, U., et al. Pig
Latin: A Not-So-Foreign Language for Data Processing. In
Proceedings of the ACM SIGMOD 2008 International Conference on
Management of Data. Vancouver, Canada, June 2008.
Pig Homepage
Pig Incubator Site
(contains source code and setup instructions)
ZooKeeper
Guide - The ZooKeeper manual
ZooKeeper Homepage
ZooKeeper Wiki