2012/11/30

in memory of pds

These are busy times, the end of the world is coming closer quickly. Maybe I should go out on the streets with a big sign stating Repent sinners before it is too late ! Be saved, use NetKernel !

Actually the Mayans didn't say anything about the end of the world, but it would definitely be one of the more interesting publicity campaigns.

Right, no more philosophy this week. In his Tic-Tac-Toe series Peter Rodgers bumps into the problem that the available in memory implementation of the PDS accessor doesn't quite implement all the functionality that a PDS accessor should have. He then quickly skips to using the H2 backed implementation.

While that of course works (the H2 implementation) I noticed in my Connect Four implementation that things are not as snappy as they should be for a game. Especially if the board gets bigger, the game becomes database bound.

So, here is my implementation of the in memory PDS accessor. It uses the golden thread pattern for expiry. Enjoy !

// The usual suspects for an accessor
import org.netkernel.layer0.meta.impl.SourcedArgumentMetaImpl;
import org.netkernel.layer0.nkf.*;
import org.netkernel.module.standard.endpoint.StandardAccessorImpl;

// Processing
import org.netkernel.layer0.representation.IHDSNode;
import org.netkernel.layer0.representation.IHDSNodeList;
import org.netkernel.layer0.representation.IReadableBinaryStreamRepresentation;
import org.netkernel.layer0.representation.impl.HDSBuilder;
import org.netkernel.request.IRequestResponseFields;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class InMemoryPDSAccessor extends StandardAccessorImpl {
    private static ConcurrentHashMap<String, InMemoryResource> mResources;
   
    private static class InMemoryResource {
        private final Object mRepresentation;
        private final IRequestResponseFields mUserMetaData;

        @SuppressWarnings("rawtypes")
        public InMemoryResource(INKFResponseReadOnly aResponse) {      
            if (aResponse != null) {  
                mRepresentation = aResponse.getRepresentation();
                mUserMetaData = aResponse.getHeaders();
            }
            else {  
                mRepresentation = null;
                mUserMetaData = null;
            }          
        }
      
        public Object getRepresentation() {  
            return mRepresentation;
        }
      
        public IRequestResponseFields getUserMetaData(){
            return mUserMetaData;
        }
    }

    public static class PDSArguments {
        public String mInstance;
        public String mZone;
        public String mIdentifier;
      
        private PDSArguments(INKFRequestContext aContext) throws NKFException {
            // Verify instance
            if (aContext.getThisRequest().argumentExists("instance")) {
                mInstance = aContext.getThisRequest().getArgumentValue("instance");
                if (mInstance.equals("pbv:instance")) {
                    mInstance = aContext.source("arg:instance",String.class);
                }
            }
            else {
                throw new NKFException("request does not have the required - instance - argument");
            }

            // Verify zone          
            if (aContext.getThisRequest().argumentExists("zone")) {
                mZone = aContext.getThisRequest().getArgumentValue("zone");
                if (mZone.equals("pbv:zone")) {
                    mZone = aContext.source("arg:zone",String.class);
                }
            }
            else {
                throw new NKFException("request does not have the required - zone - argument");
            }
            if (mZone.equals("")) {
                throw new NKFException("request does not have a valid - zone - argument");
            }          
          
            // Verify identifier
            if (aContext.getThisRequest().argumentExists("pds")) {
                mIdentifier = aContext.getThisRequest().getArgumentValue("pds");
                if (mIdentifier.equals("pbv:pds")) {
                    mIdentifier = aContext.source("arg:pds",String.class);
                }
                if (! mIdentifier.startsWith("/")) {
                    mIdentifier = "/" + mIdentifier;
                }
            }
            else {
                throw new NKFException("request does not have the required - identifier - argument");
            }
            if (mIdentifier.equals("")) {
                throw new NKFException("request does not have a valid - identifier - argument");
            }
        }

        private PDSArguments(String aInstance, String aZone, String aIdentifier) {
            mInstance = aInstance;
            mZone = aZone;
            mIdentifier = aIdentifier;
        }
      
        public Boolean isSet() {
            return mIdentifier.endsWith("/");
        }
      
        public String getCombined() {
            return mInstance + ":" + mZone + ":" + mIdentifier;
        }
      
        public String getGoldenThread() {
            return "gt:pds:" + mInstance + ":" + mZone + ":" + mIdentifier;
        }
      
        public String getIdentifier() {
            return mIdentifier;
        }

        public String getZone() {
            return mZone;
        }      

        public String getInstance() {
            return mInstance;
        }

        public boolean equals(Object aObject) {
            boolean vResult=false;
          
            if (aObject instanceof PDSArguments) {
                PDSArguments vOther = (PDSArguments) aObject;
                vResult = (mIdentifier.equals(vOther.mIdentifier) && mZone.equals(vOther.mZone) && mInstance.equals(vOther.mInstance));
            }
            return vResult;          
        }
      
        public int hashCode() {
            return mInstance.hashCode() ^ mZone.hashCode() ^ mIdentifier.hashCode();
        }
    }

    public InMemoryPDSAccessor() {
        this.declareThreadSafe();
        this.declareSourceRepresentation(IReadableBinaryStreamRepresentation.class);
        this.declareSourceRepresentation(IHDSNode.class);
        this.declareInhibitCheckForBadExpirationOnMutableResource();
        this.declareArgument(new SourcedArgumentMetaImpl("instance",null,null,new Class[] {String.class}));
        this.declareArgument(new SourcedArgumentMetaImpl("zone",null,null,new Class[] {String.class}));
        this.declareArgument(new SourcedArgumentMetaImpl("pds",null,null,new Class[] {String.class}));
        mResources = new ConcurrentHashMap<String, InMemoryResource>();
    }

    public void onSource(INKFRequestContext aContext) throws Exception {
        // SOURCE requires three arguments, instance zone and pds
      
        PDSArguments aArguments = new PDSArguments(aContext);
      
        if (aArguments.isSet()) {
            HDSBuilder vSet = new HDSBuilder();
            vSet.pushNode("set");

            for(Map.Entry<String, InMemoryResource> vEntry: mResources.entrySet()) {
                if (vEntry.getKey().startsWith(aArguments.getCombined())) {
                    int i = vEntry.getKey().indexOf(aArguments.getIdentifier());
                    String vPDS = vEntry.getKey().substring(i);
                    vSet.addNode("identifier", "pds:" + vPDS);          
                    vSet.pushNode("pds");
                    vSet.addNode("instance", aArguments.getInstance());
                    vSet.addNode("zone", aArguments.getZone());
                    vSet.addNode("pds", vPDS);
                    vSet.popNode();
                }
            }

            vSet.popNode();
          
            INKFRequest subrequest = aContext.createRequest("active:attachGoldenThread");
            subrequest.addArgument("id", aArguments.getGoldenThread());      
            aContext.issueRequest(subrequest);
          
            aContext.createResponseFrom(vSet.getRoot());
        }
        else {
            InMemoryResource vResource = mResources.get(aArguments.getCombined());
          
            if (vResource != null) {
                INKFRequest subrequest = aContext.createRequest("active:attachGoldenThread");
                subrequest.addArgument("id", aArguments.getGoldenThread());      
                aContext.issueRequest(subrequest);

                INKFResponse vResponse = aContext.createResponseFrom(vResource.getRepresentation());
                if (vResource.getUserMetaData() != null) {
                    vResponse.setHeaders(vResource.getUserMetaData());
                }
            }
            else {
                // default response is null
            }          
        }
    }
   
    public void onExists(INKFRequestContext aContext) throws Exception {
        // SOURCE requires three arguments, instance zone and pds
      
        PDSArguments aArguments = new PDSArguments(aContext);

        INKFResponse vResponse;

        if (aArguments.isSet()) {
            Boolean vResult = false;
            for(Map.Entry<String, InMemoryResource> vEntry: mResources.entrySet()) {
                if (vEntry.getKey().startsWith(aArguments.getCombined())) {
                    vResult = true;
                }
            }
            vResponse = aContext.createResponseFrom(vResult);
            if (vResult) {              
                INKFRequest subrequest = aContext.createRequest("active:attachGoldenThread");
                subrequest.addArgument("id", aArguments.getGoldenThread());      
                aContext.issueRequest(subrequest);
            }
            else {
                vResponse.setExpiry(INKFResponse.EXPIRY_ALWAYS);                          
            }
        }
        else {
            if (mResources.containsKey(aArguments.getCombined())) {
              
                INKFRequest subrequest = aContext.createRequest("active:attachGoldenThread");
                subrequest.addArgument("id", aArguments.getGoldenThread());      
                aContext.issueRequest(subrequest);
              
                vResponse = aContext.createResponseFrom(true);          
            }
            else {
                vResponse = aContext.createResponseFrom(false);
                vResponse.setExpiry(INKFResponse.EXPIRY_ALWAYS);          
            }          
        }

    }
   
    public synchronized void onSink(INKFRequestContext aContext) throws Exception {
        // Important : the onSink is synchronized ... only one at a time
        // SINK requires three arguments, instance, zone and pds and will persist
        // the primary argument
      
        PDSArguments aArguments = new PDSArguments(aContext);

        if (aArguments.isSet()) {
            throw new NKFException("unable to SINK to a set");
        }

        INKFRequest subrequest = aContext.createRequest("active:cutGoldenThread");
        subrequest.addArgument("id", aArguments.getGoldenThread());      
        aContext.issueRequest(subrequest);

        @SuppressWarnings("rawtypes")
        INKFResponseReadOnly vPrimary = aContext.getThisRequest().getPrimaryAsResponse();
        InMemoryResource vResource = new InMemoryResource(vPrimary);
        mResources.put(aArguments.getCombined(), vResource);
    }
   
    public void onDelete(INKFRequestContext aContext) throws Exception {
        // SOURCE requires three arguments, instance zone and pds
      
        PDSArguments aArguments = new PDSArguments(aContext);

        if (aArguments.isSet()) {
            IHDSNode vSet = null;
          
            INKFRequest subrequest = aContext.createRequest("active:pds");
            subrequest.addArgument("instance", aArguments.getInstance());
            subrequest.addArgument("zone", aArguments.getZone());
            subrequest.addArgument("pds", aArguments.getIdentifier());
            subrequest.setVerb(INKFRequestReadOnly.VERB_SOURCE);
            subrequest.setRepresentationClass(IHDSNode.class);
          
            vSet = (IHDSNode)aContext.issueRequest(subrequest);
            IHDSNodeList vNodes = vSet.getNodes("/set/pds");
            for (IHDSNode vNode : vNodes) {
                subrequest = aContext.createRequest("active:pds");
                subrequest.addArgument("instance", aArguments.getInstance());
                subrequest.addArgument("zone", aArguments.getZone());
                subrequest.addArgumentByValue("pds", vNode.getFirstValue("pds"));
                subrequest.setVerb(INKFRequestReadOnly.VERB_DELETE);
                subrequest.setRepresentationClass(Boolean.class);
                aContext.issueRequest(subrequest);
            }
            subrequest = aContext.createRequest("active:cutGoldenThread");
            subrequest.addArgument("id", aArguments.getGoldenThread());      
            aContext.issueRequest(subrequest);
          
            INKFResponse vResponse = aContext.createResponseFrom((vNodes != null));
            vResponse.setExpiry(INKFResponse.EXPIRY_ALWAYS);                      
        }
        else {
            INKFRequest subrequest = aContext.createRequest("active:cutGoldenThread");
            subrequest.addArgument("id", aArguments.getGoldenThread());      
            aContext.issueRequest(subrequest);
          
            InMemoryResource vResource = mResources.remove(aArguments.getCombined());
          
            INKFResponse vResponse = aContext.createResponseFrom((vResource != null));
            vResponse.setExpiry(INKFResponse.EXPIRY_ALWAYS);          
        }
    }
}