博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hbase客户端源码分析调用
阅读量:6808 次
发布时间:2019-06-26

本文共 7079 字,大约阅读时间需要 23 分钟。

hot3.png

—client 的调用流程

delete 数据的流程.(table.delete(deleteColumn);)

(源码基于-1.1.5版本)

HTable table = new HTable(conf, Bytes.toBytes(tableName));
  • 1
  • 1

HTable 对象创建时调用如下方法创建对远程的链接对象管理器

ConnectionManager.getConnectionInternal(conf) ConnectionFactory.createConnection(conf, managed, pool, user)

默认为 HConnectionImplementation 类 

通过 ZooKeeperRegistry.getClusterId 拿到集群的id 
在该方法中调用 
HConnectionImplementation.getKeepAliveZooKeeperWatcher 
拿到zk的链接 
然后调用如下方法拿到clousterId

this.clusterId = ZKClusterId.readClusterIdZNode(zkw);

接下来创建rpcclient

this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);

实现类为 RpcClientImpl 

这样一个hbase对象的创建就完成了。

table.delete(deleteColumn);开始调用。 

创建一个 RegionServerCallable 对象

RegionServerCallable
callable = new RegionServerCallable
(connection,tableName, delete.getRow())

包含了要删除的对应的表名和那一行。 

然后通过调用一下多次尝试的对象调用

rpcCallerFactory.
newCaller(rpcTimeout).callWithRetries(callable, this.operationTimeout);

在caller中,会调用到callable.prepare,进行初始化。 

在初始化的时候,要拿到对应的regionserver信息

try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {  this.location = regionLocator.getRegionLocation(row, reload);}

然后通过读取的是那一个主键,去进行定

locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID)

然后拼接要删除的行的对象

byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);

通过创建

Scan s = new Scan();  s.setReversed(true);  s.setStartRow(metaKey);  s.setSmall(true);  s.setCaching(1);

对象到发送到 meta元数据中进行查询

rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);

然后发起对 hbase:meta 元数据表的查询

smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);

然后发起了查询了

values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);

然后创建对hbase:meta表的查询请求

RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,currentScannerCallable.getRow());     在getRegionLocations 方法中   rl = cConnection.locateRegion(tableName, row, useCache, true, replicaId);

发起元数据远程连接的调用

@Overridepublic RegionLocations locateRegion(final TableName tableName,  final byte [] row, boolean useCache, boolean retry, int replicaId)throws IOException {  if (this.closed) throw new IOException(toString() + " closed");  if (tableName== null || tableName.getName().length == 0) {    throw new IllegalArgumentException(        "table name cannot be null or zero length");  }  if (tableName.equals(TableName.META_TABLE_NAME)) {    return locateMeta(tableName, useCache, replicaId);  } else {    // Region not in the cache - have to go to the meta RS    return locateRegionInMeta(tableName, row, useCache, retry, replicaId);  }}

通过判断是否是元数据表,进行不同的定位,如果是元数据,就去zk中进行读取,否则,

在元数据中

locations = this.registry.getMetaRegionLocation();

拿到meta元数据的位置信息,这个meta元数据的位置信息是存放在zk中的 

通过下面的方法读取

List
servers = new MetaTableLocator().blockUntilAvailable(zkw, hci.rpcTimeout, hci.getConfiguration());

拿到后,会缓存在client端 

然后就创建到hmaster的连接了

setStub(super.getConnection().getClient(dest));(dest=myubuntu,16020,1468468352472)

创建直接到这个目标端口的rpc 客户端连接

BlockingRpcChannel channel =  this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);  stub = ClientService.newBlockingStub(channel);

创建了 BlockingRpcChannelImplementation对象 

真正发起了请求了(这个时候的tableName ==hbase:meta)还是元数据表

@Overridepublic Result[] call(int timeout) throws IOException {  if (this.closed) return null;  if (Thread.interrupted()) {throw new InterruptedIOException();  }  ScanRequest request = RequestConverter.buildScanRequest(getLocation()  .getRegionInfo().getRegionName(), getScan(), getCaching(), true);  ScanResponse response = null;  controller = controllerFactory.newController();  try {controller.setPriority(getTableName());controller.setCallTimeout(timeout);response = getStub().scan(controller, request);Result[] results = ResponseConverter.getResults(controller.cellScanner(),response);if (response.hasMoreResultsInRegion()) {  setHasMoreResultsContext(true);  setServerHasMoreResults(response.getMoreResultsInRegion());} else {  setHasMoreResultsContext(false);}// We need to update result metrics since we are overriding call()updateResultsMetrics(results);return results;  } catch (ServiceException se) {throw ProtobufUtil.getRemoteException(se);  }}

request 对象内容为(blog2为我要删除的表。rowkey1为我要删除的主键)

region {  type: REGION_NAME  value: "hbase:meta,,1"}scan {  start_row: "blog2,rowkey1,99999999999999"  max_versions: 1  cache_blocks: true  small: true  reversed: true  caching: 1}number_of_rows: 1close_scanner: trueclient_handles_partials: trueclient_handles_heartbeats: true

然后在 RpcClientImpl 的Connection对象的创建过程中 

设置了servicename =ClientService

ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();  builder.setServiceName(remoteId.getServiceName());  UserInformation userInfoPB = getUserInfo(ticket);  if (userInfoPB != null) {builder.setUserInfo(userInfoPB);  }  if (this.codec != null) {builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());  }  if (this.compressor != null) {builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());  }  builder.setVersionInfo(ProtobufUtil.getVersionInfo());  this.header = builder.build();

这样就设置了头部信息了 

然后就根据刚才创建的connection信息,进行了发送查询

final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,pcrc.getCallTimeout());final Connection connection = getConnection(ticket, call, addr);connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());

然后现在就连接到远程 setupIOstreams();

NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);

在这里直接发送数据过去了 

IPCUtil.write(this.out, header, call.param, cellBlock);

当把结果拿到后

setStub(getConnection().getClient(this.location.getServerName()));(location = region=blog2,,1468401068588.f8572806e5866a27ccf7464ec899bb18., hostname=myubuntu,16020,1468468352472, seqNum=25)

就可以连接到具体的那个业务表的regionserver了 

拿到地址后,最后回调回对原始方法的调用

public void delete(final Delete delete)  throws IOException {RegionServerCallable
callable = new RegionServerCallable
(connection,tableName, delete.getRow()) { @Override public Boolean call(int callTimeout) throws IOException {PayloadCarryingRpcController controller = rpcControllerFactory.newController();controller.setPriority(tableName);controller.setCallTimeout(callTimeout);try { MutateRequest request = RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete); MutateResponse response = getStub().mutate(controller, request); return Boolean.valueOf(response.getProcessed());} catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se);} }};rpcCallerFactory.
newCaller(rpcTimeout).callWithRetries(callable,this.operationTimeout); }

tableName = blog2

最终返回到用户了

转载于:https://my.oschina.net/sniperLi/blog/910772

你可能感兴趣的文章
【mybatis深度历险系列】mybatis中的动态sql
查看>>
UWP-磁贴初识
查看>>
keepalived and heartbeat
查看>>
Git--分支管理
查看>>
我们不知道我们不知道:用同化项目做网络安全
查看>>
简单高效的短链接生成服务C#实现
查看>>
Java调用Memcache入门
查看>>
如何开发一个完整的JavaScript组件
查看>>
8个字符即可令Skype崩溃而且再也打不开
查看>>
IBM推出全闪存产品 加速服务器I/O性能
查看>>
高德地图提前上线多条重要道路预通车机制不断成熟
查看>>
大规模机器学习:将数据科学引入生产系统架构的典型模式
查看>>
做可穿戴医疗,你的对手是“天性”
查看>>
FortiGuard 实验室报告:全球受攻击的IoT设备呈指数级增长
查看>>
国内趋于概念化的 “数据分析”在硅谷是怎样真正落地的?
查看>>
物联网时代如何保障数据安全
查看>>
你还敢用鼠标吗?黑客在百米之外控制你的鼠标
查看>>
比特币勒索攻击技术演进与趋势
查看>>
《iOS取证实战:调查、分析与移动安全》一3.6 iPhone操作系统
查看>>
苹果HomeKit智能家居战略藏野心:欲实现下一次颠覆
查看>>