HDFS案例代码
Configuration configuration = new Configuration();FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop000:8020"), configuration); InputStream in = fileSystem.open(new Path(HDFS_PATH+"/hdfsapi/test/log4j.properties"));OutputStream out = new FileOutputStream(new File("log4j_download.properties"));IOUtils.copyBytes(in, out, 4096, true); //最后一个参数表示完成拷贝之后关闭输入/出流
FileSystem.java
static final Cache CACHE = new Cache();public static FileSystem get(URI uri, Configuration conf) throws IOException { String scheme = uri.getScheme(); //hdfs String authority = uri.getAuthority(); //hadoop000:8020 return CACHE.get(uri, conf);}FileSystem get(URI uri, Configuration conf) throws IOException{ Key key = new Key(uri, conf); return getInternal(uri, conf, key);}private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{ FileSystem fs; synchronized (this) { fs = map.get(key); } //根据URI取得一个FileSystem实例,如果允许缓存,会中从缓存中取出,否则将调用createFileSystem创建一个新实例 if (fs != null) { return fs; } fs = createFileSystem(uri, conf); synchronized (this) { FileSystem oldfs = map.get(key); ... //放入到CACHE中秋 return fs; }}private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException { Class clazz = getFileSystemClass(uri.getScheme(), conf); // 返回的是:org.apache.hadoop.hdfs.DistributedFileSystem FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); //初始化DistributedFileSystem return fs;}public static Class getFileSystemClass(String scheme,Configuration conf) throws IOException { if (!FILE_SYSTEMS_LOADED) { //文件系统是否被加载过,刚开始时为false loadFileSystems(); } Class clazz = null; if (conf != null) { clazz = (Class ) conf.getClass("fs." + scheme + ".impl", null); //fs.hdfs.impl ,此时我们并没有在core-default.xml和core-site.xml中配置该属性 } if (clazz == null) { clazz = SERVICE_FILE_SYSTEMS.get(scheme); //class org.apache.hadoop.hdfs.DistributedFileSystem } if (clazz == null) { throw new IOException("No FileSystem for scheme: " + scheme); } return clazz;}private static void loadFileSystems() { synchronized (FileSystem.class) { if (!FILE_SYSTEMS_LOADED) { ServiceLoaderserviceLoader = ServiceLoader.load(FileSystem.class); for (FileSystem fs : serviceLoader) { SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass()); } FILE_SYSTEMS_LOADED = true; //标识为已经从系统中加载过 } }}
loadFileSystems后SERVICE_FILE_SYSTEMS存在如下值:
file=class org.apache.hadoop.fs.LocalFileSystem, ftp=class org.apache.hadoop.fs.ftp.FTPFileSystem, hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem, hftp=class org.apache.hadoop.hdfs.web.HftpFileSystem, webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem, s3n=class org.apache.hadoop.fs.s3native.NativeS3FileSystem, viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, har=class org.apache.hadoop.fs.HarFileSystem, s3=class org.apache.hadoop.fs.s3.S3FileSystem, hsftp=class org.apache.hadoop.hdfs.web.HsftpFileSystem
DistributedFileSystem.java
DFSClient dfs; //重点属性:客户端与服务端交互操作需要先拿到DFSClient@Overridepublic void initialize(URI uri, Configuration conf) throws IOException { super.initialize(uri, conf); setConf(conf); String host = uri.getHost(); //hadoop000 this.dfs = new DFSClient(uri, conf, statistics); this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority()); this.workingDir = getHomeDirectory();}
DFSClient.java
final ClientProtocol namenode; //重点属性:客户端与NameNode通信的PRC接口public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats)throws IOException { NameNodeProxies.ProxyAndInfoproxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class); this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); //org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB}
NameNodeProxies.java
public staticProxyAndInfo createProxy(Configuration conf, URI nameNodeUri, Class xface) throws IOException { Class > failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface); return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,UserGroupInformation.getCurrentUser(), true);}public static ProxyAndInfo createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class xface, UserGroupInformation ugi, boolean withRetries) throws IOException { Text dtService = SecurityUtil.buildTokenService(nnAddr); T proxy; if (xface == ClientProtocol.class) { proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries); } ... return new ProxyAndInfo (proxy, dtService);}private static ClientProtocol createNNProxyWithClientProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi,boolean withRetries) throws IOException { //Client与NameNode的RPC交互接口 final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy) .getProxy(); if (withRetries) { //使用jdk的动态代理创建实例 proxy = (ClientNamenodeProtocolPB) RetryProxy.create( ClientNamenodeProtocolPB.class,new DefaultFailoverProxyProvider ( ClientNamenodeProtocolPB.class, proxy),methodNameToPolicyMap,defaultPolicy); } return new ClientNamenodeProtocolTranslatorPB(proxy);}
RetryProxy.java
public staticObject create(Class iface,FailoverProxyProvider proxyProvider, RetryPolicy retryPolicy) { return Proxy.newProxyInstance( proxyProvider.getInterface().getClassLoader(), new Class [] { iface }, new RetryInvocationHandler (proxyProvider, retryPolicy) );}
获取FileSystem实例源码分析总结:
1、FileSystem.get通过反射实例化了一个DistributedFileSystem;
2、DistributedFileSystem中new DFSCilent()把他作为自己的成员变量;
3、在DFSClient构造方法里面,调用了createProxy使用RPC机制得到了一个NameNode的代理对象,就可以和NameNode进行通信;
4、整个流程:FileSystem.get()--> DistributedFileSystem.initialize() --> DFSClient(RPC.getProtocolProxy()) --> NameNode的代理。