HDFS编程记录

简介

本次记录将记载进行HDFS的Java API编程所需要的一些准备情况,同时介绍一些使用频率较高的类的函数。

准备情况

为了导入一个能够与HDFS交互的Java应用程序,需要向java工程中添加以下JAR包:

  1. ”/usr/local/hadoop/share/hadoop/common”目录下的hadoop-common-2.7.1.jar和haoop-nfs-2.7.1.jar;
  2. /usr/local/hadoop/share/hadoop/common/lib”目录下的所有JAR包;
  3. “/usr/local/hadoop/share/hadoop/hdfs”目录下的haoop-hdfs-2.7.1.jar和haoop-hdfs-nfs-2.7.1.jar;
  4. “/usr/local/hadoop/share/hadoop/hdfs/lib”目录下的所有JAR包。

核心类的分析

1.Configuration

使用这个类首先需要在头部引入import org.apache.conf.Configuration;
使用方法:

Configuration conf = new Configuration();

conf.set("fs.defaultFS","hdfs://localhost:9000");

这部分的代码的作用就是链接到hadoop,通过规定好conf我们就可以在FileSystem中使用,基本上是每个与hadoop打交道的代码最开始都要加上的语句。

2.FileSystem

使用这个类需要在头部引入import org.apache.hadoop.fs.FileSyetem;
使用方法:

FileSystem fs = FileSystem.get(conf);

通过已经设置好信息的conf就可以创建一个与自己hadoop链接着的FileSystem,而通过FileSystem就可以获得FSDataInputStream与FSDataOutputStream这两个较为有用的类。

类的函数解析

1.创建新文件

FSDataOutputStream create(Path);

create方法写入文件时会自动创建父目录,不需要显示地创建一个目录,相当于shell中的hadoop fs -put localfile dst
可以利用这个对文件进行编辑操作

2.文件内容追加函数

FSDataOutputStream append(Path);

需要调用append(Path f)获取FSDataOutputStream类对象,即可对已存在的文件进行写入。
注:如果在进行追加时遇到baddatanode的错误可以在conf初始化设置时追加两句:

conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");

3.打开(查看)文件

FSDataInputStream open(Path);

调用open()方法可以获得文件的输入流;配合BufferedReader可以做到按行读取,显示文件内容到终端。

4.查看文件系统信息

FileStatus getFileStatus(Path);
FileStatus[] listStatus(Path);.//获取路径下所有文件的FileStatus

相当于shell接口的hadoop fs -ls
利用FileStatus可以查看文件的系统信息

5.递归查看文件系统信息

RemoteIterator<LocatedFileStatus> listFiles(Path,boolean);

这里第二个参数就是判但是否允许递归,如果为true就会把目录中的目的也打开获得里面的文件信息。

6.重命名

boolean rename(Path,Path);//前一个Path是原名,后一个Path是新名字

7.删除文件

boolean delete(Path,boolean);//第二的boolean是判断是否允许递归删除

8.下载文件

void moveToLocalFile(Path,Path);
//第一个Path是hdfs路径,第二个是本地路径
//这个相当于将hdfs文件剪贴到本地

void copyToLocalFile(Path,Path);
//第一个Path是hdfs路径,第二个是本地路径
//这个相当于hdfs文件复制粘贴到本地

9.创建目录

boolean exists(Path);

10.创建文件夹

boolean mkdirs(Path);

3.FSDataInputStream

`
import org.apache.hadoop.fs.FSDataInputStream;
`
用来读取文件内容的一个很有用的类
用法:

FSDataInputStream in = fs.open(remotePath);
BufferedReader d = new BufferedReader(new InputStreamReader(in));
d.readLine();

4.FSDataOutputStream

`
import org.apache.hadoop.fs.FSDataOutputStream;
`
用来编辑文件内容的很有用的一个类
用法:

FSDataOutputStream out = fs.append(remotePath);
out.write(content.getBytes());

5.FileStatus

`
import org.apache.hadoop.fs.FileStatus;
`
用来查看文件的系统信息非常有用的一个类
用法:

System.out.println("路径:" + s.getPath().toString());
System.out.println("权限" + s.getPermission().toString());
System.out.println("大小" + s.getLen());

long timeStamp = s.getModificationTime();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String date = format.format(timeStamp);
System.out.println("时间:" + date);

通过FileSystem类的getFileStatus(Path)等获得。

代码实例

追加内容

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.*;

import org.apache.hadoop.fs.FileSystem;

import java.io.*;
import java.util.Scanner;

public class AppendFile {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://localhost:9000");
        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
        conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
        String remoteFilePath = "/user/caopeng/test.txt";
        String appendContent = "追加内容\n";
        String choice;
        try{
            if(!AppendFile.test(conf,remoteFilePath)){
                System.out.println("文件不存在"+remoteFilePath);
            }else {
                System.out.println("追加在前后在后before|after");
                Scanner sc = new Scanner(System.in);
                choice = sc.next();
                if (choice.equals("after")){
                    AppendFile.appendToFile(conf,appendContent,
                            remoteFilePath);
                    System.out.println("追加到文件尾"+remoteFilePath);
                }else if(choice.equals("before")){
                    //文件下载到本地,新建文件,将追加内容先追加,将源文件内容后追加
                    String localTmpPath = "/Users/caopeng/desktop/tmp.txt";
                    AppendFile.moveToLocal(conf,remoteFilePath,localTmpPath);
                    AppendFile.createFile(conf,remoteFilePath);
                    AppendFile.appendToFile(conf,appendContent,remoteFilePath);
                    AppendFile.fileAppendToFile(conf,localTmpPath,remoteFilePath);
                    System.out.println("追加到文件开头:"+remoteFilePath);
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    //检测路径是否存在
    public static boolean test(Configuration conf, String path) {
        try (FileSystem fs = FileSystem.get(conf)) {
            return fs.exists(new Path(path));
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
    }

    //追加文本内容
    public static void appendToFile(Configuration conf, String content,
                                    String remoteFilePath) {
        try (FileSystem fs = FileSystem.get(conf)) {
            Path remotePath = new Path(remoteFilePath);
            FSDataOutputStream out = fs.append(remotePath);
            out.write(content.getBytes());
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //文件内容追加写入文件
    public static void fileAppendToFile(Configuration conf, String localFilePath,
                                        String remoteFilePath) {
        Path remotePath = new Path(remoteFilePath);
        try (FileSystem fs = FileSystem.get(conf);
             FileInputStream in = new FileInputStream(localFilePath);) {
            FSDataOutputStream out = fs.append(remotePath);
            byte[] data = new byte[1024];
            int read = -1;
            while((read = in.read(data))>0){
                out.write(data,0,read);
            }
            out.close();
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    //文件下载
    public static void moveToLocal(Configuration conf,
                                   String remoteFilePath,
                                   String localFilePath){
        try(FileSystem fs = FileSystem.get(conf)){
            Path remotePath = new Path(remoteFilePath);
            Path localPath = new Path(localFilePath);
            fs.moveToLocalFile(remotePath,localPath);
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    //创建文件
    public static void createFile(Configuration conf, String remoteFilePath) {
        Path remotePath = new Path(remoteFilePath);
        try (FileSystem fs = FileSystem.get(conf)) {
            FSDataOutputStream outputStream = fs.create(remotePath);
            outputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

查看文件系统信息

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.*;

import org.apache.hadoop.fs.FileSystem;


import java.io.*;

import java.text.SimpleDateFormat;


public class ShowFileList {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://localhost:9000");
        String remoteDir = "/user";

        try{
            System.out.println("读取目录下所有文件的信息:"+remoteDir);
            ShowFileList.showList(conf,remoteDir);
            System.out.println("读取完成");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    public static void showList(Configuration conf, String remoteDir) {
        try (FileSystem fs = FileSystem.get(conf)) {
            Path dirPath = new Path(remoteDir);
            RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(dirPath,true);

            while (remoteIterator.hasNext()){
                FileStatus s = remoteIterator.next();
                System.out.println("路径:"+s.getPath().toString());
                System.out.println("权限:"+s.getPermission().toString());
                System.out.println("大小"+s.getLen());
                Long timeStamp = s.getModificationTime();
                SimpleDateFormat format = new SimpleDateFormat(
                        "YYYY-MM-DD HH:MM:SS"
                );
                String date = format.format(timeStamp);
                System.out.println("时间"+date+"\n");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

看文件

import java.io.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

public class Cat {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        String remoteFilePath = "/user/caopeng/test.txt";
        try {
            System.out.println("读取文件:" + remoteFilePath);
            Cat.Cat(conf, remoteFilePath);
            System.out.println("\n读取完成");
        } catch (Exception a) {
            a.printStackTrace();
        }
    }

    public static void Cat(Configuration conf, String remoteFilePath) {
        Path remotePath = new Path(remoteFilePath);
        try (FileSystem fs = FileSystem.get(conf);
             FSDataInputStream in = fs.open(remotePath);
             BufferedReader d = new BufferedReader(new InputStreamReader(in));) {
            String line;
            while ((line = d.readLine()) != null) {
                System.out.println(line);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
最后修改于:2021年03月26日 16:50

添加新评论