—client 的调用流程
delete 数据的流程.(table.delete(deleteColumn);)
(源码基于-1.1.5版本)
HTable table = new HTable(conf, Bytes.toBytes(tableName));
- 1
- 1
HTable 对象创建时调用如下方法创建对远程的链接对象管理器
ConnectionManager.getConnectionInternal(conf) ConnectionFactory.createConnection(conf, managed, pool, user)
默认为 HConnectionImplementation 类
通过 ZooKeeperRegistry.getClusterId 拿到集群的id 在该方法中调用 HConnectionImplementation.getKeepAliveZooKeeperWatcher 拿到zk的链接 然后调用如下方法拿到clousterIdthis.clusterId = ZKClusterId.readClusterIdZNode(zkw);
接下来创建rpcclient
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
实现类为 RpcClientImpl
这样一个hbase对象的创建就完成了。table.delete(deleteColumn);开始调用。
创建一个 RegionServerCallable 对象RegionServerCallablecallable = new RegionServerCallable (connection,tableName, delete.getRow())
包含了要删除的对应的表名和那一行。
然后通过调用一下多次尝试的对象调用rpcCallerFactory.newCaller(rpcTimeout).callWithRetries(callable, this.operationTimeout);
在caller中,会调用到callable.prepare,进行初始化。
在初始化的时候,要拿到对应的regionserver信息try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { this.location = regionLocator.getRegionLocation(row, reload);}
然后通过读取的是那一个主键,去进行定
locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID)
然后拼接要删除的行的对象
byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
通过创建
Scan s = new Scan(); s.setReversed(true); s.setStartRow(metaKey); s.setSmall(true); s.setCaching(1);
对象到发送到 meta元数据中进行查询
rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
然后发起对 hbase:meta 元数据表的查询
smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
然后发起了查询了
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
然后创建对hbase:meta表的查询请求
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,currentScannerCallable.getRow()); 在getRegionLocations 方法中 rl = cConnection.locateRegion(tableName, row, useCache, true, replicaId);
发起元数据远程连接的调用
@Overridepublic RegionLocations locateRegion(final TableName tableName, final byte [] row, boolean useCache, boolean retry, int replicaId)throws IOException { if (this.closed) throw new IOException(toString() + " closed"); if (tableName== null || tableName.getName().length == 0) { throw new IllegalArgumentException( "table name cannot be null or zero length"); } if (tableName.equals(TableName.META_TABLE_NAME)) { return locateMeta(tableName, useCache, replicaId); } else { // Region not in the cache - have to go to the meta RS return locateRegionInMeta(tableName, row, useCache, retry, replicaId); }}
通过判断是否是元数据表,进行不同的定位,如果是元数据,就去zk中进行读取,否则,
在元数据中
locations = this.registry.getMetaRegionLocation();
拿到meta元数据的位置信息,这个meta元数据的位置信息是存放在zk中的
通过下面的方法读取Listservers = new MetaTableLocator().blockUntilAvailable(zkw, hci.rpcTimeout, hci.getConfiguration());
拿到后,会缓存在client端
然后就创建到hmaster的连接了setStub(super.getConnection().getClient(dest));(dest=myubuntu,16020,1468468352472)
创建直接到这个目标端口的rpc 客户端连接
BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); stub = ClientService.newBlockingStub(channel);
创建了 BlockingRpcChannelImplementation对象
真正发起了请求了(这个时候的tableName ==hbase:meta)还是元数据表@Overridepublic Result[] call(int timeout) throws IOException { if (this.closed) return null; if (Thread.interrupted()) {throw new InterruptedIOException(); } ScanRequest request = RequestConverter.buildScanRequest(getLocation() .getRegionInfo().getRegionName(), getScan(), getCaching(), true); ScanResponse response = null; controller = controllerFactory.newController(); try {controller.setPriority(getTableName());controller.setCallTimeout(timeout);response = getStub().scan(controller, request);Result[] results = ResponseConverter.getResults(controller.cellScanner(),response);if (response.hasMoreResultsInRegion()) { setHasMoreResultsContext(true); setServerHasMoreResults(response.getMoreResultsInRegion());} else { setHasMoreResultsContext(false);}// We need to update result metrics since we are overriding call()updateResultsMetrics(results);return results; } catch (ServiceException se) {throw ProtobufUtil.getRemoteException(se); }}
request 对象内容为(blog2为我要删除的表。rowkey1为我要删除的主键)
region { type: REGION_NAME value: "hbase:meta,,1"}scan { start_row: "blog2,rowkey1,99999999999999" max_versions: 1 cache_blocks: true small: true reversed: true caching: 1}number_of_rows: 1close_scanner: trueclient_handles_partials: trueclient_handles_heartbeats: true
然后在 RpcClientImpl 的Connection对象的创建过程中
设置了servicename =ClientServiceConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); builder.setServiceName(remoteId.getServiceName()); UserInformation userInfoPB = getUserInfo(ticket); if (userInfoPB != null) {builder.setUserInfo(userInfoPB); } if (this.codec != null) {builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName()); } if (this.compressor != null) {builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName()); } builder.setVersionInfo(ProtobufUtil.getVersionInfo()); this.header = builder.build();
这样就设置了头部信息了
然后就根据刚才创建的connection信息,进行了发送查询final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,pcrc.getCallTimeout());final Connection connection = getConnection(ticket, call, addr);connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());
然后现在就连接到远程 setupIOstreams();
NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);
在这里直接发送数据过去了
IPCUtil.write(this.out, header, call.param, cellBlock);当把结果拿到后
setStub(getConnection().getClient(this.location.getServerName()));(location = region=blog2,,1468401068588.f8572806e5866a27ccf7464ec899bb18., hostname=myubuntu,16020,1468468352472, seqNum=25)
就可以连接到具体的那个业务表的regionserver了
拿到地址后,最后回调回对原始方法的调用public void delete(final Delete delete) throws IOException {RegionServerCallablecallable = new RegionServerCallable (connection,tableName, delete.getRow()) { @Override public Boolean call(int callTimeout) throws IOException {PayloadCarryingRpcController controller = rpcControllerFactory.newController();controller.setPriority(tableName);controller.setCallTimeout(callTimeout);try { MutateRequest request = RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete); MutateResponse response = getStub().mutate(controller, request); return Boolean.valueOf(response.getProcessed());} catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se);} }};rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable,this.operationTimeout); }
tableName = blog2
最终返回到用户了