`
wangzjie
  • 浏览: 72516 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop 新版api中的JobControl实现

阅读更多

依赖关系组合式MapReduce

 

答案是采取JobControl,直接上代码。

JobControl依赖关系组合式MpaReduce。

旧版实现:org.apache.hadoop.mapred包下

Configuration job1conf = new Configuration();
Job job1 = new Job(job1conf,"Job1");
.........//job1 其他设置
Configuration job2conf = new Configuration();
Job job2 = new Job(job2conf,"Job2");
.........//job2 其他设置
Configuration job3conf = new Configuration();
Job job3 = new Job(job3conf,"Job3");
.........//job3 其他设置
job3.addDepending(job1);//设置job3和job1的依赖关系
job3.addDepending(job2);
JobControl JC = new JobControl("123");
JC.addJob(job1);//把三个job加入到jobcontorl中
JC.addJob(job2);
JC.addJob(job3);
JC.run();

 新版api实现:org.apache.hadoop.mapreduce.lib.jobcontrol.*(在hadoop 0.20.2还没有,hadoop 1.x已经有了)

使用该包下的ControlledJob与JobControl

/** 
     * job2 依赖于 job1 
     * @param job1 
     * @param job2 
     * @param chainName 
     * @return 
     * @throws IOException 
     */  
    public static int handleJobChain(Job job1 ,Job job2, String chainName) throws IOException{  
        ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
        controlledJob1.setJob(job1);         
        ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration()) 
        controlledJob2.setJob(job2);  
        controlledJob2.addDependingJob(controlledJob1);          
        JobControl jc = new JobControl(chainName);  
        jc.addJob(controlledJob1);  
        jc.addJob(controlledJob2);  
        Thread jcThread = new Thread(jc);  
        jcThread.start();  
        while(true){  
            if(jc.allFinished()){  
                System.out.println(jc.getSuccessfulJobList());  
                jc.stop();  
                return 0;  
            }  
            if(jc.getFailedJobList().size() > 0){  
                System.out.println(jc.getFailedJobList());  
                jc.stop();  
                return 1;  
            }  
        }  
    } 

 

要注意的地方就是hadoop的JobControl类实现了线程Runnable接口。我们需要实例化一个线程来让它启动。直接调用JobControl的run()方法,线程将无法结束。要调用JobControl.stop()才能停止

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics