博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
FileSystem实例化过程
阅读量:6701 次
发布时间:2019-06-25

本文共 6561 字,大约阅读时间需要 21 分钟。

HDFS案例代码

Configuration configuration = new Configuration();FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop000:8020"), configuration);    InputStream in = fileSystem.open(new Path(HDFS_PATH+"/hdfsapi/test/log4j.properties"));OutputStream out = new FileOutputStream(new File("log4j_download.properties"));IOUtils.copyBytes(in, out, 4096, true); //最后一个参数表示完成拷贝之后关闭输入/出流

 

FileSystem.java

static final Cache CACHE = new Cache();public static FileSystem get(URI uri, Configuration conf) throws IOException {    String scheme = uri.getScheme();   //hdfs    String authority = uri.getAuthority();  //hadoop000:8020    return CACHE.get(uri, conf);}FileSystem get(URI uri, Configuration conf) throws IOException{    Key key = new Key(uri, conf);    return getInternal(uri, conf, key);}private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{    FileSystem fs;    synchronized (this) {        fs = map.get(key);    }        //根据URI取得一个FileSystem实例,如果允许缓存,会中从缓存中取出,否则将调用createFileSystem创建一个新实例    if (fs != null) {         return fs;    }        fs = createFileSystem(uri, conf);    synchronized (this) {         FileSystem oldfs = map.get(key);        ... //放入到CACHE中秋        return fs;    }}private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {    Class
clazz = getFileSystemClass(uri.getScheme(), conf); // 返回的是:org.apache.hadoop.hdfs.DistributedFileSystem FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); //初始化DistributedFileSystem return fs;}public static Class
getFileSystemClass(String scheme,Configuration conf) throws IOException { if (!FILE_SYSTEMS_LOADED) { //文件系统是否被加载过,刚开始时为false loadFileSystems(); } Class
clazz = null; if (conf != null) { clazz = (Class
) conf.getClass("fs." + scheme + ".impl", null); //fs.hdfs.impl ,此时我们并没有在core-default.xml和core-site.xml中配置该属性 } if (clazz == null) { clazz = SERVICE_FILE_SYSTEMS.get(scheme); //class org.apache.hadoop.hdfs.DistributedFileSystem } if (clazz == null) { throw new IOException("No FileSystem for scheme: " + scheme); } return clazz;}private static void loadFileSystems() { synchronized (FileSystem.class) { if (!FILE_SYSTEMS_LOADED) { ServiceLoader
serviceLoader = ServiceLoader.load(FileSystem.class); for (FileSystem fs : serviceLoader) { SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass()); } FILE_SYSTEMS_LOADED = true; //标识为已经从系统中加载过 } }}

loadFileSystems后SERVICE_FILE_SYSTEMS存在如下值:

file=class org.apache.hadoop.fs.LocalFileSystem, ftp=class org.apache.hadoop.fs.ftp.FTPFileSystem, hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem, hftp=class org.apache.hadoop.hdfs.web.HftpFileSystem, webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem, s3n=class org.apache.hadoop.fs.s3native.NativeS3FileSystem, viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, har=class org.apache.hadoop.fs.HarFileSystem, s3=class org.apache.hadoop.fs.s3.S3FileSystem, hsftp=class org.apache.hadoop.hdfs.web.HsftpFileSystem

 

DistributedFileSystem.java

DFSClient dfs; //重点属性:客户端与服务端交互操作需要先拿到DFSClient@Overridepublic void initialize(URI uri, Configuration conf) throws IOException {    super.initialize(uri, conf);    setConf(conf);    String host = uri.getHost();  //hadoop000    this.dfs = new DFSClient(uri, conf, statistics);    this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());    this.workingDir = getHomeDirectory();}

 

DFSClient.java

final ClientProtocol namenode; //重点属性:客户端与NameNode通信的PRC接口public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats)throws IOException {        NameNodeProxies.ProxyAndInfo
proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class); this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); //org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB}

 

NameNodeProxies.java

public static 
ProxyAndInfo
createProxy(Configuration conf, URI nameNodeUri, Class
xface) throws IOException { Class
> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface); return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,UserGroupInformation.getCurrentUser(), true);}public static
ProxyAndInfo
createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class
xface, UserGroupInformation ugi, boolean withRetries) throws IOException { Text dtService = SecurityUtil.buildTokenService(nnAddr); T proxy; if (xface == ClientProtocol.class) { proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries); } ... return new ProxyAndInfo
(proxy, dtService);}private static ClientProtocol createNNProxyWithClientProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi,boolean withRetries) throws IOException { //Client与NameNode的RPC交互接口 final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy) .getProxy(); if (withRetries) { //使用jdk的动态代理创建实例 proxy = (ClientNamenodeProtocolPB) RetryProxy.create( ClientNamenodeProtocolPB.class,new DefaultFailoverProxyProvider
( ClientNamenodeProtocolPB.class, proxy),methodNameToPolicyMap,defaultPolicy); } return new ClientNamenodeProtocolTranslatorPB(proxy);}

 

RetryProxy.java

public static 
Object create(Class
iface,FailoverProxyProvider
proxyProvider, RetryPolicy retryPolicy) { return Proxy.newProxyInstance( proxyProvider.getInterface().getClassLoader(), new Class
[] { iface }, new RetryInvocationHandler
(proxyProvider, retryPolicy) );}

 

 获取FileSystem实例源码分析总结:

1、FileSystem.get通过反射实例化了一个DistributedFileSystem;

2、DistributedFileSystem中new DFSCilent()把他作为自己的成员变量;

3、在DFSClient构造方法里面,调用了createProxy使用RPC机制得到了一个NameNode的代理对象,就可以和NameNode进行通信;

4、整个流程:FileSystem.get()--> DistributedFileSystem.initialize() --> DFSClient(RPC.getProtocolProxy()) --> NameNode的代理。

 

转载地址:http://qngoo.baihongyu.com/

你可能感兴趣的文章
如何有效抓取SQL Server的BLOCKING信息
查看>>
bash中(),{},(()),[],[[]]的区别
查看>>
Oracle PL/SQL匿名块(三)
查看>>
模拟实现strstr
查看>>
解决Office系列安装不上的办法
查看>>
vimdiff的简单使用
查看>>
我的友情链接
查看>>
工作的习惯,看到好收藏下
查看>>
利用ACS来实现AAA服务
查看>>
国内开源镜像站
查看>>
vb.net中东软医保接口的调用
查看>>
java 消息摘要算法MD
查看>>
Web Service security UserNameToken 使用
查看>>
I/O重定向
查看>>
去除vue项目中的#及其ie9兼容性
查看>>
linux实例 批量修改图片文件名
查看>>
day15(mysql 的多表查询,事务)
查看>>
IOS
查看>>
beta冲刺第三天
查看>>
beta第二天
查看>>