15 May 2018 # GraphX: Graph Processing in a Distributed Dataflow Framework

**GraphX** system tries to distill the common operations in a graph specialized system to a general data flow framework; it aims at unifying computation on tables and graphs. The data processed as collections in general data flow framework had to be harnessed/adjusted to view them as a graph (without duplication or data movement). The other motivation was to add the graph processing to the Spark eco-system, so that the whole data analytics pipeline can be executed in a single system.

Modern analytics tasks (explore two views of the data - as table and also as a graph):

- Pipeline 1: Raw Wikipedia data -> Link table (between articles) -> Hyperlinks -> Page Rank -> Top 20 Pages (on Wikipedia - a result stored as a materialized view in a database)
- Pipeline 2: Raw Wikipedia data -> Discussion table (who contributed to which articles) -> Editor graph -> Community detection (run a graph algorithm) -> User Community (table with each user labeled with a community)
- Pipeline 3: combine Page Rank and Community Detection (algorithms on graphs) -> Top Communities (on Wikipedia)

Separate systems for two views of the data:

- Table - in dataflow systems such as Spark or Hadoop (scalable and general purpose)
- Graphs - specialized graph processing systems: Pregel, GraphLab, Giraph (graph structure computation and iterative algorithms).

Points about combining different systems:

- difficult to manage and leads to creating brittle interfaces
- inefficient - extensive data movement and duplication across the network and file system; limited reuse of internal data structures across stages

GraphX idea of unifying table an graph views in the library on top of a single physical computation exposed by Spark Dataflow Framework, thus enabling a single system to easily and efficiently support the entire pipeline.

It is not possible to just throw away the graph specialized systems and do everything in Hadoop, e.g., the PageRank algorithm (10 iterations) on the Live-Journal Graph takes 1340 seconds on Hadoop, whereas it can be computed in 22 seconds on GraphLab. So, Hadoop ends up being 60X slower than on GraphLab. It runs in 354 seconds on Spark (16X slower than on GraphLab).

Key challenge is to express and efficiently execute graph computation in a general purpose distributed dataflow framework:

- express graph representation as collections (and tables)
- optimize the graph algorithms for the dataflow model
- execute at big scale

- The ideas of Gonzalez, especially the scatter-gather were beautifully cast to the MapReduce jobs, where after mapping/scattering from the triplets, we get (vertex, message) pairs, that are then reduced/combined/gathered (message combiners) - it allows us to sum the message, for a particular vertex.
- Using the basic GraphX operators (set of graph primitives/operations built on top of Spark) - able to implement Pregel and GraphLab in under 50 lines of code in Scala.
- Introducing more scalability to the graph processing task, where scalability is for capacity and robustness (not necessarily for performance).

- The data sets above are not small (billions of edges), but can run on a laptop and be much faster than the distributed systems/
- GraphX uses hash tables (which can add an order of magnitude to the critical path) rather than dense arrays, to maintain its per-vertex state, to provide a layer of robustness. Fault-tolerance calls for data to be pushed from fast random access memory out to stable storage to insure against lost work in the case of a failure.
- Analytics only on static graphs (what about if graph changes very often) - maybe supported within the lineage sub-system in Spark (in the future?).
- Naiad (which incorporates asynchrony, not present in GraphX) was not really compared to GraphX. Naiad was found to be a way faster system, including the specialized systems.

- Page rank is a general algorithm for graph computation:
- Local rank of a page can be expressed as a sum of ranks of the neighboring pages, rank of page i: R[i] = 0.15 (the random reset probability) + $\sum_{j \in Links(i)} \frac{R[j]}{OutLinks(j)}$ (a weighted sum of neighbors’ ranks).
- It runs until convergence.

- Vertex centric pattern of graph computation - i.e. “Think like a vertex” (Malewicz, SIGMOD 2010):
- Graph-parallel pattern (Gonzalez, OSDI 2012):
- Gather - collect information from neighboring vertices
- Apply - update the vertex value
- Scatter - sent the new value information to neighboring vertices

- Graph algorithms are used in machine learning and network analysis.
- Machine learning with graphs: collaborative filtering (alternating least squares, stochastic gradient descent, tensor factorization), structured prediction (Loopy Belief Propagation, Max-Product Linear Program, Gibbs sampling), Semi-supervised ML (Graph SSL, CoEM).
- Graph algorithm in network analysis: community detection (Triangle-counting, k-core decomoposition, k-Truss), graph analytics (PageRank, Personalized PageRank, Shortest Path, Graph coloring).
- GraphX is about moving/learning from the specialized computation pattern -> to the specialized graph optimizations. How to create specialized optimizations to exploit the pattern.
- Graph system optimizations:
- Specialized data structures to find neighboring vertices and edges.
- Vertex-cut partitioning - cut along vertices, instead of cutting along the edges, especially for power-law graphs - often seen in the real use cases.
- Exploit the partitioning of the graph with remote caching/mirroring - keep copies of the vertices on remote machines, so that we can reduce the vertices for all their local adjacent neighbors.
- Message combiners concept was introduced in Pregel - a commutative associative way of combining messages from remote machines. It reduces the communication via network and exploits the graph topology.
- Active set tracking - track vertices that are changing for each iteration, they often are decreasing rapidly. Then, focus the computational resources on the active vertices.

- GraphX tackles the two problems:
- Representation:
- structure: express distributed graphs as horizontally partitioned tables
- operations: how to represent a vertex program as joins (or other dataflow operators)

- Optimizations:
- Express/re-cast the optimizations developed for specialized graph systems in the context of more general distributed dataflow systems:
- distributed join optimization
- materialized view maintenance

- Express/re-cast the optimizations developed for specialized graph systems in the context of more general distributed dataflow systems:

- Representation:
- Property graph data model:
- Vertex property
- user profile
- current PageRankValue

- Edge property
- Weights
- Timestamps

- Vertex property
- Re-interpret the graph as tables to encode it in the dataflow framework - a normalized (3rd norm) representation:
- Vertex Table (RDD) - Vertices stored in a horizontally partitioned table (patitioned by vertex id)
- Edge Table (RDD) - Vertex cut algorithm used to partitioned the edges across the machines
- Routing Table (RDD) - which edges are adjacent to a given vertex (vertex with its list of edges) - this is a type of many-to-many relationship.

- Separate properties and structure: reuse structural information across multiple graphs (e.g. after a transform of the vertex properties). Many views of the same kind of data. Logical view of the graph binds the physical/tabular representation.
- Triplets: logical join of the vertices and edges.
- API methos:
- reverse a graph (?)
- subraph (focus on specific edges and vertices)
- join - external tabular information with the graph
- mrTriplets - the core operation (the map-reduce) triplets operator - to capture the gather-scatter pattern from the specialized graph processing systems.

- Triplets - join vertices and edges: join the vertex table (RDD) with the edge table (RDD). For a given edge, attach to its ends - the properties of the vertices from the vertex table (RDD). We get a triplets table (Vertex1 - Edge - Vertex2). Then execute MapReduce jobs on the triple table. Map function is applied on each triplet, create a message to one of the destination vertices: Map (v1-edge-v2) -> (v, message). The users define a commutative-reduction operation, which is a direct analog to the message combiners.