有多个按时间戳(20177041735,20177041740)排序下来的文件,需要对这些文件按天(20170417)合并:
/*
* 按天合并文件
*/
public class MergerDayFile {
private static final Log LOG = LogFactory.getLog(MergerDayFile.class);
1 |
public static void main(String[] args) throws Exception { Properties properties = new Properties(); // 读取配置文件"./config/mergerFile.properties" FileInputStream in = new FileInputStream("./config/mergerFile.properties"); properties.load(in); String dest = properties.getProperty("dest"); String src1 = properties.getProperty("src1"); String url = properties.getProperty("url"); // 相同文件按天合并,合并到指定位置 mergerfile(dest, src1, url); try { in.close(); } catch (Exception e) { // TODO: handle exception } } private static void mergerfile(String destPath, String src1, String url) throws Exception { Configuration conf = new Configuration(); // HashMap<String, List<Path>> map = new HashMap<String, List<Path>>(); FileSystem fs = FileSystem.get(new URI(url), conf, "root"); FileStatus[] allFiles = fs.listStatus(new Path(src1)); String lastFile = null; String currentFile = null; List<Path> mergerDay = new ArrayList<Path>(); for (int i = 0; i < allFiles.length; i++) { FileStatus file = allFiles[i]; Path path = file.getPath(); String name = path.getName(); // 解析文件名 currentFile = parseTime(name); // 解析上一次文件名 String lastFileName = parseTime(lastFile); // 第一次结果判断 if (lastFile == null) { mergerDay.add(path); lastFile = currentFile; } else if (i == allFiles.length - 1) { mergerData(destPath, mergerDay, fs); mergerDay.clear(); mergerDay.add(path); mergerData(destPath, mergerDay, fs); } else if (currentFile.equals(lastFileName)) { mergerDay.add(path); } else { mergerData(destPath, mergerDay, fs); mergerDay.clear(); mergerDay.add(path); lastFile = currentFile; } } fs.close(); } private static void mergerData(String destPath, List<Path> mergerDay, FileSystem fs) { Path key = mergerDay.get(0); String name = key.getName(); try { FSDataOutputStream out = fs.create(new Path(destPath + name)); for (Path path : mergerDay) { FSDataInputStream open = fs.open(path); IOUtils.copyBytes(open, out, 8012, false); open.close(); } LOG.info(name + "合并成功"); out.close(); } catch (IOException e) { LOG.error(name + "合并失败", e); e.printStackTrace(); } } // 按天合并文件 private static String parseTime(String name) { String day = null; try { String[] datas = name.split("\\.", 2); String date = datas[0]; day = date.substring(0, 8); } catch (Exception e) { } return day; } |