博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于MaxCompute的图计算实践分享-Resolver简介
阅读量:6608 次
发布时间:2019-06-24

本文共 8511 字,大约阅读时间需要 28 分钟。

更多精彩内容参见云栖社区大数据频道,此外,通过Maxcompute及其配套产品,大数据分析仅需几步,详情访问。

Resolver简介

在学习使用MaxCompute-Graph计算模型时,resolver是一个不容易理解的概念。在MaxCompute帮助文档  中对于Resolver有相关介绍:VertexResolver 用于自定义图拓扑修改时的冲突处理逻辑.

​在Graph的迭代计算模型中,图的拓扑结构会在下面两个场景中发生变化:

​1,在图加载阶段(LOAD),worker会读取输入数据,然后发送修改图拓扑结构的请求;在RESOLVE_LOADING_MUTATION阶段,worker会根据LOAD阶段收到的图拓扑结构变更请求,对图的拓扑结构进行修改;

​2,在每个superstep的COMPUTE阶段,worker也会发送修改图拓扑结构的请求;在下一个superstep的RESOLVE_COMPUTING_MUTATION阶段,会处理收集到上个superstep中发送到该Vertex的请求,然后对图的拓扑结构进行修改。

 

图拓扑结构变化的流程

​下图是COMPUTE阶段图拓扑结构发生变化的示例:

alt

1,在superstep:k的compute阶段,vertex会试图改变图的拓扑结构,修改图拓扑结构的请求包括以下四种:

  1. ​addVertexRequest:添加顶点
  2. ​removeVertexRequest:删除顶点
  3. ​addEdgeRequest:添加边
  4. ​removeEdgeRequest:删除边

这些改变图拓扑结构的请求并不会马上执行并返回,而只是记录在了目标顶点所在的worker上。

2,这些改变图拓扑结构的请求会根据操作顶点的id,partition分发到对应worker上,每个worker会将在superstep:k中收集到的图变更请求保存在serverData.partitionMutations中。这样在superstep:k+1时,每个顶点可以从serverData中获取到对应自己操作的所有请求。

3,在superstep:K+1的RESOLVE_COMPUTING_MUTATION阶段,worker会从serverData获取到对应每个vertexID的vertexChanges(即上一个superstep中对改顶点的图修改请求集合),分别对应然后将其作为参数调用用户定义的resole方法,在resolve方法中完成对当前Vertex结构的修改。

4,完成RESOLVE_COMPUTING_MUTATION阶段之后,开始superstep:K+1的compute阶段。

 

如何定义resolver

用户可以自定义VertexResolver的实现,即自定义类继承VertexResolver并重写VertexResolver.resolve方法,然后在提交作业时通过:

  1. job.setLoadingVertexResolverClass(VertexResolver.class)
  2. job.setComputingVertexResolverClass(VertexResolver.class)
 
分别指定在图加载阶段和COMPUTE阶段处理图拓扑结构修改请求的resolver实现类。对于LoadingVertexResolver,如果用户没有指定,框架会提供一个默认的Default
LoadingVertexResolver实现;但是对于
ComputingVertexResolver,如果没有指定,是没有默认实现的,就是说如果用户需要在compute阶段对图的拓扑结构进行修改,则必须指定Resolver实现,否则会抛异常。

关于“冲突”

为什么graph的框架需要用户去自定义Resolver的实现,一个很重要的原因是在处理修改图拓扑结构时可能会出现各种“冲突”。

在graph模型中,修改图拓扑结构是一个异“步(superstep)”的请求,在某次superstep中发起的请求,需要到下一个superstep中才能生效。同时对于同一个顶点的修改请求,可能来自各个不同的worker,彼此之间不能相互感知,来自不同worker的请求先后顺序也是不确定的,所以会出现很多冲突的场景,例如对通过一个顶点进行多次添加,删除不存在的边等等。

在默认的DefaultLoadingVertexResolver实现中,对于下面这些场景会认为是“冲突”,然后会抛出异常:

  • 添加重复点:通常使用 addVertexRequest 添加点,而相同 ID 的点已经存在。
  • 添加重复边:通常使用 addEdgeRequest 添加边时,而相同起点和终点的边已经存在;或者使用 addVertexRequest 添加点时,待添加点中存在重复边。
  • 删除不存在的边:通常使用 removeEdgeRequest 删除边时,待删除的边不存在。
  • 删除不存在的点:通常使用 removeVertexRequest 删除点时,待删除的点不存在。
  • 发送消息到不存在的点:通常使用 sendMessage 发送消息时,目标点不存在。

用户自定义的Resolver中可以自行定义冲突场景,或者对上述冲突场景做其他处理。

​SSSP_Split的例子

​在graph提供的example中提供的三个例子都没有自定义resolver,也没有在迭代过程中改变图拓扑结构。下面这个例子是在原先最简单的例子SSSP(单源最短路径)的基础上增加了分裂顶点的功能,即在某轮迭代中发现某个顶点的出边数大于某个阈值时,将其分裂成两个顶点,这两个顶点之间的距离为0。

package com.test;import java.io.IOException;import java.util.Iterator;import java.util.List;import com.aliyun.odps.Record;import com.aliyun.odps.graph.Combiner;import com.aliyun.odps.graph.ComputeContext;import com.aliyun.odps.graph.DefaultLoadingVertexResolver;import com.aliyun.odps.graph.Edge;import com.aliyun.odps.graph.GraphJob;import com.aliyun.odps.graph.GraphLoader;import com.aliyun.odps.graph.MutationContext;import com.aliyun.odps.graph.Vertex;import com.aliyun.odps.graph.VertexChanges;import com.aliyun.odps.graph.VertexResolver;import com.aliyun.odps.graph.WorkerContext;import com.aliyun.odps.io.LongWritable;import com.aliyun.odps.io.TableInfo;import com.aliyun.odps.io.Text;import com.aliyun.odps.io.WritableComparable;public class SSSP_Split {  public static final String START_VERTEX = "sssp.start.vertex.id";  public static final String MAX_DEGREE = "sssp.vertex.max.degree";  public static class SSSPVertex extends      Vertex
{ private static long startVertexId = -1; private static long STEP_BASE = 100; public SSSPVertex() { this.setValue(new LongWritable(Long.MAX_VALUE)); } public boolean isStartVertex( ComputeContext
context) { if (startVertexId == -1) { String s = context.getConfiguration().get(START_VERTEX); startVertexId = Long.parseLong(s); } return getId().get() == startVertexId; } @Override public void compute( ComputeContext
context, Iterable
messages) throws IOException { long minDist = isStartVertex(context) ? 0 : Long.MAX_VALUE; for (LongWritable msg : messages) { if (msg.get() < minDist) { minDist = msg.get(); } } if (minDist < this.getValue().get()) { this.setValue(new LongWritable(minDist)); if (hasEdges()) { for (Edge
e : this.getEdges()) { context.sendMessage(e.getDestVertexId(), new LongWritable(minDist + e.getValue().get())); } } } else { voteToHalt(); } // 这里执行分裂顶点的逻辑,当顶点的出边数大于阈值sssp.vertex.max.degree时执行分裂, // 并且分裂行为只在前3轮发生. if (this.getEdges().size() > context.getConfiguration().getInt( MAX_DEGREE, Integer.MAX_VALUE) && context.getSuperstep() < 3) { SSSPVertex splitVertex = new SSSPVertex(); // 分裂的顶点id为原id+100*2^迭代轮次,这样避免id出现冲突(初始顶点数小于100) // 即顶点3在第0步时分裂出来顶点103,顶点103在第2轮时分裂出顶点503 splitVertex.setId(new LongWritable(getId().get() + (STEP_BASE << context.getSuperstep()))); splitVertex.setValue(getValue()); context.addVertexRequest(splitVertex); // 添加原始顶点与分裂顶点之间的边,距离为0 context.addEdgeRequest(splitVertex.getId(), new Edge
(this.getId(), new LongWritable(0))); context.addEdgeRequest(getId(), new Edge
( splitVertex.getId(), new LongWritable(0))); // 将原始顶点上一半的出边转移到分裂顶点上。 for (int i = 0; i < this.getEdges().size(); i++) { if (i % 2 == 0) { context.removeEdgeRequest(getId(), getEdges().get(i).getDestVertexId()); context.addEdgeRequest(splitVertex.getId(), getEdges().get(i)); } } } } @Override public void cleanup( WorkerContext
context) throws IOException { String edges = ""; for (int i =0;i
{ @Override public void combine(LongWritable vertexId, LongWritable combinedMessage, LongWritable messageToCombine) throws IOException { if (combinedMessage.get() > messageToCombine.get()) { combinedMessage.set(messageToCombine.get()); } } } public static class SSSPVertexReader extends GraphLoader
{ @Override public void load( LongWritable recordNum, Record record, MutationContext
context) throws IOException { SSSPVertex vertex = new SSSPVertex(); vertex.setId((LongWritable) record.get(0)); String[] edges = record.get(1).toString().split(","); for (int i = 0; i < edges.length; i++) { String[] ss = edges[i].split(":"); vertex.addEdge(new LongWritable(Long.parseLong(ss[0])), new LongWritable(Long.parseLong(ss[1]))); } context.addVertexRequest(vertex); } } // 这里定义computer阶段的resolver public static class SSSPVertexResolver extends VertexResolver{ @Override public Vertex resolve(WritableComparable vertexId, Vertex vertex, VertexChanges vertexChanges, boolean hasMessages) throws IOException { // 处理添加顶点的请求 if (vertexChanges.getAddedVertexList() != null && vertexChanges.getAddedVertexList().size() > 0) { vertex = (Vertex) vertexChanges.getAddedVertexList().get(0); } // 处理添加边的请求 if (vertexChanges.getAddedEdgeList() != null && vertexChanges.getAddedEdgeList().size() > 0) { for (Edge
edge : (List
>) (vertexChanges .getAddedEdgeList())) { vertex.addEdge(edge.getDestVertexId(), edge.getValue()); } } // 处理删除边的请求 if (vertexChanges.getRemovedEdgeList() != null && vertexChanges.getRemovedEdgeList().size() > 0) { for (LongWritable removedDestVertex : (List
)vertexChanges.getRemovedEdgeList()) { List
> edgeList = vertex.getEdges(); for (Iterator
> edges = edgeList.iterator(); edges.hasNext();) { Edge
edge = edges.next(); if (edge.getDestVertexId().equals(removedDestVertex)) { edges.remove(); } } } } // 处理删除顶点的请求 if (vertexChanges.getRemovedVertexCount() > 0) { // do nothing } return vertex; } } public static void main(String[] args) throws IOException { if (args.length < 3) { System.out.println("Usage:
"); System.exit(-1); } GraphJob job = new GraphJob(); job.setGraphLoaderClass(SSSPVertexReader.class); job.setVertexClass(SSSPVertex.class); job.setCombinerClass(MinLongCombiner.class); // 设置compute阶段的resolver job.setComputingVertexResolverClass(SSSPVertexResolver.class); job.set(START_VERTEX, args[0]); job.set(MAX_DEGREE, args[1]); job.addInput(new TableInfo(args[2])); job.addOutput(new TableInfo(args[3])); long startTime = System.currentTimeMillis(); job.run(); System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); }}

输入表数据ssp_in:

1,"2:2,3:1,4:4,5:5"2,"1:2,3:2,4:1,5:4"3,"1:1,2:2,4:2,5:3"4,"1:4,2:1,3:2,5:1"5,"1:5,2:4,3:1,4:1"

输出表数据sssp_out:

102,2:1:4:,2103,3:1:4:,11,3:5:101:,02,3:5:102:,2101,1:2:4:,03,2:5:103:,14,2:5:104:,35,2:4:105:,4104,4:1:3:,3105,5:1:3:,4

输出数据中,分裂出来的点与源点(顶点103和顶点3)与目秒顶点的距离相同,两者实际仍然是同一个点。

转载地址:http://nwdso.baihongyu.com/

你可能感兴趣的文章
Ansible之playbook的使用
查看>>
ansible模块批量管理
查看>>
redis命令 - GET
查看>>
httpd.conf的基本设置
查看>>
RHEL/Centos7新功能
查看>>
第一部分 思科九年 一(1)
查看>>
DBA日常工作职责
查看>>
Redis的持久化
查看>>
linux安装NFS服务器学习
查看>>
Planner .NET日历日程控件能给你的应用程序提供多种日历日程功能
查看>>
我的友情链接
查看>>
Linux压力测试
查看>>
JAVA中的线程机制(二)
查看>>
nginx安装与配置2(转载)
查看>>
Linux下Mongodb安装和启动配置
查看>>
2015 成长计划
查看>>
沈阳一饭店凌晨爆燃,燃气报警器时刻预防
查看>>
Redis 与 数据库处理数据的两种模式
查看>>
VUE2中axios的使用方法
查看>>
CS 229 notes Supervised Learning
查看>>