atom feed1 message in org.apache.incubator.accumulo-commitssvn commit: r1292032 - in /incubator/...
FromSent OnAttachments
ktur...@apache.orgFeb 21, 2012 1:14 pm 
Subject:svn commit: r1292032 - in /incubator/accumulo/trunk: ./ src/core/ src/server/ src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
From:ktur...@apache.org (ktur@apache.org)
Date:Feb 21, 2012 1:14:47 pm
List:org.apache.incubator.accumulo-commits

Author: kturner Date: Tue Feb 21 21:14:46 2012 New Revision: 1292032

URL: http://svn.apache.org/viewvc?rev=1292032&view=rev Log: ACCUMULO-422 Fixed two bugs caused by tserver dying during bulk import (merged
from 1.4)

Modified: incubator/accumulo/trunk/ (props changed) incubator/accumulo/trunk/src/core/ (props changed) incubator/accumulo/trunk/src/server/ (props changed) incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java

Propchange: incubator/accumulo/trunk/

------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Feb 21 21:14:46 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873,1245632 /incubator/accumulo/branches/1.3.5rc:1209938 -/incubator/accumulo/branches/1.4:1201902-1245824 +/incubator/accumulo/branches/1.4:1201902-1292029

Propchange: incubator/accumulo/trunk/src/core/

------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Feb 21 21:14:46 2012 @@ -1,3 +1,3 @@ -/incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215 /incubator/accumulo/branches/1.3.5rc/src/core:1209938 -/incubator/accumulo/branches/1.4/src/core:1201902-1245824 +/incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215 +/incubator/accumulo/branches/1.4/src/core:1201902-1292029

Propchange: incubator/accumulo/trunk/src/server/

------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Feb 21 21:14:46 2012 @@ -1,3 +1,3 @@ -/incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611 /incubator/accumulo/branches/1.3.5rc/src/server:1209938 -/incubator/accumulo/branches/1.4/src/server:1201902-1245824 +/incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611 +/incubator/accumulo/branches/1.4/src/server:1201902-1292029

Modified:
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java URL:
http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1292032&r1=1292031&r2=1292032&view=diff ============================================================================== ---
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
(original) +++
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Tue Feb 21 21:14:46 2012 @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -57,6 +58,7 @@ import org.apache.accumulo.server.Server import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.fate.Repo; +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.master.Master; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.security.SecurityConstants; @@ -71,6 +73,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapFile; import org.apache.log4j.Logger; import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException;

import cloudtrace.instrument.TraceExecutorService;

@@ -110,7 +113,6 @@ public class BulkImport extends MasterRe this.sourceDir = sourceDir; this.errorDir = errorDir; this.setTime = setTime; - log.debug(this.getDescription()); }

@Override @@ -283,7 +285,8 @@ class CleanUpBulkImport extends MasterRe Set<TServerInstance> running = master.onlineTabletServers(); for (TServerInstance server : running) { try { - if (!master.getConnection(server).isActive(tid)) + TServerConnection client = master.getConnection(server); + if (client != null && !client.isActive(tid)) finished.add(server); } catch (TException ex) { log.info("Ignoring error trying to check on tid " + tid + " from server
" + server + ": " + ex); @@ -374,6 +377,13 @@ class LoadFiles extends MasterRepo { }

@Override + public long isReady(long tid, Master master) throws Exception { + if (master.onlineTabletServers().size() == 0) + return 500; + return 0; + } + + @Override public Repo<Master> call(final long tid, Master master) throws Exception { final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration(); FileSystem fs =
TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(), @@ -383,7 +393,7 @@ class LoadFiles extends MasterRepo { files.add(entry); } log.debug("tid " + tid + " importing " + files.size() + " files"); - + Path writable = new Path(this.errorDir, ".iswritable"); if (!fs.createNewFile(writable)) { // Maybe this is a re-try... clear the flag and try again @@ -399,16 +409,22 @@ class LoadFiles extends MasterRepo { filesToLoad.add(f.getPath().toString());

+ final Map<String,Long> blackList = Collections.synchronizedMap(new
HashMap<String,Long>()); + final int RETRIES = Math.max(1,
conf.getCount(Property.MASTER_BULK_RETRIES)); for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0;
attempt++) { List<Future<List<String>>> results = new
ArrayList<Future<List<String>>>();

// Figure out which files will be sent to which server - Set<TServerInstance> currentServers = Collections.synchronizedSet(new
HashSet<TServerInstance>(master.onlineTabletServers())); - Map<String,List<String>> loadAssignments = new
HashMap<String,List<String>>(); - for (TServerInstance server : currentServers) { - loadAssignments.put(server.hostPort(), new ArrayList<String>()); + Map<String,List<String>> loadAssignments = initializeLoadAssignments(tid,
master, conf, blackList); + if (loadAssignments.size() == 0) + log.warn("There are no tablet server to process bulk import, waiting
(tid = " + tid + ")"); + + while (loadAssignments.size() == 0) { + UtilWaitThread.sleep(500); + loadAssignments = initializeLoadAssignments(tid, master, conf,
blackList); } + int i = 0; List<Entry<String,List<String>>> entries = new
ArrayList<Entry<String,List<String>>>(loadAssignments.entrySet()); for (String file : filesToLoad) { @@ -442,8 +458,11 @@ class LoadFiles extends MasterRepo { failures.addAll(fail); } } + } catch (TTransportException tte) { + log.warn("blacklisting server " + finalEntry.getKey() + " tid " +
tid + " " + tte, tte); + blackList.put(finalEntry.getKey(), System.currentTimeMillis()); } catch (Exception ex) { - log.error(ex, ex); + log.error("rpc failed, server " + finalEntry.getKey() + " tid " +
tid + " " + ex, ex); } finally { ServerClient.close(client); } @@ -455,7 +474,7 @@ class LoadFiles extends MasterRepo { for (Future<List<String>> f : results) failures.addAll(f.get()); if (filesToLoad.size() > 0) { - log.debug("tid " + tid + " attempt " + (i + 1) + " " +
sampleList(filesToLoad, 10) + " failed"); + log.debug("tid " + tid + " attempt " + (attempt + 1) + " " +
sampleList(filesToLoad, 10) + " failed"); UtilWaitThread.sleep(100); } } @@ -480,6 +499,29 @@ class LoadFiles extends MasterRepo { return new CompleteBulkImport(tableId, source, bulk, errorDir); }

+ private Map<String,List<String>> initializeLoadAssignments(final long tid,
Master master, final SiteConfiguration conf, final Map<String,Long> blackList) { + + // remove servers from black list that have been there a while + Iterator<Entry<String,Long>> bliter = blackList.entrySet().iterator(); + long zkTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); + while (bliter.hasNext()) { + Entry<String,Long> blentry = bliter.next(); + if (System.currentTimeMillis() - blentry.getValue() > zkTimeout * 2) { + log.debug("Removing server from blacklist " + blentry.getKey() + " tid
" + tid); + bliter.remove(); + } + } + + Set<TServerInstance> currentServers = new
HashSet<TServerInstance>(master.onlineTabletServers()); + Map<String,List<String>> loadAssignments = new
HashMap<String,List<String>>(); + for (TServerInstance server : currentServers) { + loadAssignments.put(server.hostPort(), new ArrayList<String>()); + } + + loadAssignments.keySet().removeAll(blackList.keySet()); + return loadAssignments; + } + static String sampleList(Collection<?> potentiallyLongList, int max) { StringBuffer result = new StringBuffer(); result.append("[");