Logo Search packages:      
Sourcecode: zeroc-ice-java version File versions  Download package

ThreadPool.java

// **********************************************************************
//
// Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved.
//
// This copy of Ice is licensed to you under the terms described in the
// ICE_LICENSE file included in this distribution.
//
// **********************************************************************

package IceInternal;

public final class ThreadPool
{
    private final static boolean TRACE_REGISTRATION = false;
    private final static boolean TRACE_INTERRUPT = false;
    private final static boolean TRACE_SHUTDOWN = false;
    private final static boolean TRACE_SELECT = false;
    private final static boolean TRACE_EXCEPTION = false;
    private final static boolean TRACE_THREAD = false;
    private final static boolean TRACE_STACK_TRACE = false;

    public
    ThreadPool(Instance instance, String prefix, int timeout)
    {
        _instance = instance;
        _destroyed = false;
        _prefix = prefix;
        _timeout = timeout;
      _threadIndex = 0;
      _running = 0;
      _inUse = 0;
      _load = 1.0;
      _promote = true;
      _warnUdp = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0;

      //
      // If we are in thread per connection mode, no thread pool should
      // ever be created.
      //
      assert(!_instance.threadPerConnection());

      String programName = _instance.initializationData().properties.getProperty("Ice.ProgramName");
        if(programName.length() > 0)
        {
            _programNamePrefix = programName + "-";
        }
      else
      {
          _programNamePrefix = "";
      }

      Network.SocketPair pair = Network.createPipe();
        _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source;
        _fdIntrWrite = pair.sink;

        try
        {
            _selector = java.nio.channels.Selector.open();
            pair.source.configureBlocking(false);
            _fdIntrReadKey = pair.source.register(_selector, java.nio.channels.SelectionKey.OP_READ);
        }
        catch(java.io.IOException ex)
        {
            Ice.SyscallException sys = new Ice.SyscallException();
            sys.initCause(ex);
            throw sys;
        }

        //
        // The Selector holds a Set representing the selected keys. The
        // Set reference doesn't change, so we obtain it once here.
        //
        _keys = _selector.selectedKeys();

      //
      // We use just one thread as the default. This is the fastest
      // psossible setting, still allows one level of nesting, and
      // doesn't require to make the servants thread safe.
      //
      int size = _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1);
      if(size < 1)
      {
          String s = _prefix + ".Size < 1; Size adjusted to 1";
          _instance.initializationData().logger.warning(s);
          size = 1;
      }           

      int sizeMax = 
          _instance.initializationData().properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
      if(sizeMax < size)
      {
          String s = _prefix + ".SizeMax < " + _prefix + ".Size; SizeMax adjusted to Size (" + size + ")";
          _instance.initializationData().logger.warning(s);
          sizeMax = size;
      }
            
      int sizeWarn = _instance.initializationData().properties.getPropertyAsIntWithDefault(
                                                _prefix + ".SizeWarn", sizeMax * 80 / 100);
      if(sizeWarn > sizeMax)
      {
          String s = _prefix + ".SizeWarn > " + _prefix + ".SizeMax; adjusted SizeWarn to SizeMax (" + sizeMax + ")";
          _instance.initializationData().logger.warning(s);
          sizeWarn = sizeMax;
      }

      _size = size;
      _sizeMax = sizeMax;
      _sizeWarn = sizeWarn;
      
      try
        {
            _threads = new java.util.ArrayList();
            for(int i = 0; i < _size; i++)
            {
            EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
                                                   _threadIndex++);
                _threads.add(thread);
            thread.start();
            ++_running;
            }
        }
        catch(RuntimeException ex)
        {
          java.io.StringWriter sw = new java.io.StringWriter();
          java.io.PrintWriter pw = new java.io.PrintWriter(sw);
          ex.printStackTrace(pw);
          pw.flush();
          String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
          _instance.initializationData().logger.error(s);

            destroy();
          joinWithAllThreads();
            throw ex;
        }
    }

    protected synchronized void
    finalize()
        throws Throwable
    {
        IceUtil.Assert.FinalizerAssert(_destroyed);
    }

    public synchronized void
    destroy()
    {
        if(TRACE_SHUTDOWN)
        {
            trace("destroy");
        }

        assert(!_destroyed);
      assert(_handlerMap.isEmpty());
      assert(_changes.isEmpty());
        _destroyed = true;
        setInterrupt();
    }

    public synchronized void
    _register(java.nio.channels.SelectableChannel fd, EventHandler handler)
    {
        if(TRACE_REGISTRATION)
        {
            trace("adding handler of type " + handler.getClass().getName() + " for channel " + fd);
        }
      assert(!_destroyed);
        _changes.add(new FdHandlerPair(fd, handler));
        setInterrupt();
    }

    public synchronized void
    unregister(java.nio.channels.SelectableChannel fd)
    {
        if(TRACE_REGISTRATION)
        {
            if(TRACE_STACK_TRACE)
            {
                java.io.StringWriter sw = new java.io.StringWriter();
                try
                {
                    throw new RuntimeException();
                }
                catch(RuntimeException ex)
                {
                    java.io.PrintWriter pw = new java.io.PrintWriter(sw);
                    ex.printStackTrace(pw);
                    pw.flush();
                }
                trace("removing handler for channel " + fd + "\n" + sw.toString());
            }
            else
            {
                trace("removing handler for channel " + fd);
            }
        }

      assert(!_destroyed);
        _changes.add(new FdHandlerPair(fd, null));
        setInterrupt();
    }

    public void
    promoteFollower()
    {
        if(_sizeMax > 1)
        {
          synchronized(this)
          {
            assert(!_promote);
            _promote = true;
            notify();
                
            if(!_destroyed)
            {
                assert(_inUse >= 0);
                ++_inUse;
                
                if(_inUse == _sizeWarn)
                {
                  String s = "thread pool `" + _prefix + "' is running low on threads\n"
                      + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
                  _instance.initializationData().logger.warning(s);
                }
                
                assert(_inUse <= _running);
                if(_inUse < _sizeMax && _inUse == _running)
                {
                  try
                  {
                      EventHandlerThread thread = new EventHandlerThread(_programNamePrefix + _prefix + "-" +
                                                             _threadIndex++);
                      _threads.add(thread);
                      thread.start();
                      ++_running;
                  }
                  catch(RuntimeException ex)
                  {
                      java.io.StringWriter sw = new java.io.StringWriter();
                      java.io.PrintWriter pw = new java.io.PrintWriter(sw);
                      ex.printStackTrace(pw);
                      pw.flush();
                      String s = "cannot create thread for `" + _prefix + "':\n" + sw.toString();
                      _instance.initializationData().logger.error(s);
                  }
                }
            }
          }
        }
    }

    public void
    joinWithAllThreads()
    {
      //
      // _threads is immutable after destroy() has been called,
      // therefore no synchronization is needed. (Synchronization
      // wouldn't be possible here anyway, because otherwise the
      // other threads would never terminate.)
      //
      java.util.Iterator i = _threads.iterator();
      while(i.hasNext())
      {
          EventHandlerThread thread = (EventHandlerThread)i.next();
          
            while(true)
            {
                try
                {
                    thread.join();
                    break;
                }
                catch(InterruptedException ex)
                {
                }
            }
        }

      //
      // Cleanup the selector, and the socket pair.
      //
      try
      {
          if(_selector != null)
          {
            try
            {
                _selector.close();
            }
            catch(java.io.IOException ex)
            {
                //
                // BUGFIX:
                //
                // Ignore this exception. This shouldn't happen
                // but for some reasons the close() call raises
                // "java.io.IOException: Bad file descriptor" on
                // Mac OS X 10.3.x (it works fine on OS X 10.4.x)
                //
            }
            _selector = null;
          }

          if(_fdIntrWrite != null)
          {
            try
            {
                _fdIntrWrite.close();
            }
            catch(java.io.IOException ex)
            {
                //
                // BUGFIX:
                //
                // Ignore this exception. This shouldn't happen
                // but for some reasons the close() call raises
                // "java.io.IOException: No such file or
                // directory" under Linux with JDK 1.4.2.
                //
            }
            _fdIntrWrite = null;
          }

          if(_fdIntrRead != null)
          {
            _fdIntrRead.close();
            _fdIntrRead = null;
          }
      }
      catch(java.io.IOException ex)
      {
          java.io.StringWriter sw = new java.io.StringWriter();
          java.io.PrintWriter pw = new java.io.PrintWriter(sw);
          ex.printStackTrace(pw);
          pw.flush();
          String s = "exception in `" + _prefix + "' while calling close():\n" + sw.toString();
          _instance.initializationData().logger.error(s);
      }
    }

    public String
    prefix()
    {
      return _prefix;
    }
    
    private void
    clearInterrupt()
    {
        if(TRACE_INTERRUPT)
        {
            trace("clearInterrupt");
            if(TRACE_STACK_TRACE)
            {
                try
                {
                    throw new RuntimeException();
                }
                catch(RuntimeException ex)
                {
                    ex.printStackTrace();
                }
            }
        }

        byte b = 0;

        java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
        try
        {
            while(true)
            {
                buf.rewind();
                if(_fdIntrRead.read(buf) != 1)
                {
                    break;
                }

                if(TRACE_INTERRUPT)
                {
                    trace("clearInterrupt got byte " + (int)buf.get(0));
                }

                b = buf.get(0);
                break;
            }
        }
        catch(java.io.IOException ex)
        {
            Ice.SocketException se = new Ice.SocketException();
            se.initCause(ex);
            throw se;
        }
    }

    private void
    setInterrupt()
    {
        if(TRACE_INTERRUPT)
        {
            trace("setInterrupt()");
            if(TRACE_STACK_TRACE)
            {
                try
                {
                    throw new RuntimeException();
                }
                catch(RuntimeException ex)
                {
                    ex.printStackTrace();
                }
            }
        }

        java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
        buf.put(0, (byte)0);
        while(buf.hasRemaining())
        {
            try
            {
                _fdIntrWrite.write(buf);
            }
            catch(java.io.IOException ex)
            {
                Ice.SocketException se = new Ice.SocketException();
                se.initCause(ex);
                throw se;
            }
        }
    }

    //
    // Each thread supplies a BasicStream, to avoid creating excessive
    // garbage (Java only).
    //
    private boolean
    run(BasicStream stream)
    {
      if(_sizeMax > 1)
      {
          synchronized(this)
          {
            while(!_promote)
            {
                try
                {
                  wait();
                }
                catch(InterruptedException ex)
                {
                }
            }
            
            _promote = false;
          }
          
          if(TRACE_THREAD)
          {
            trace("thread " + Thread.currentThread() + " has the lock");
          }
      }

      while(true)
        {
          if(TRACE_REGISTRATION)
          {
            java.util.Set keys = _selector.keys();
            trace("selecting on " + keys.size() + " channels:");
            java.util.Iterator i = keys.iterator();
            while(i.hasNext())
            {
                java.nio.channels.SelectionKey key = (java.nio.channels.SelectionKey)i.next();
                trace("  " + key.channel());
            }
          }

          EventHandler handler = null;

          //
          // Only call select() if there are no pending handlers with additional data
          // for us to read.
          //
          if(!_pendingHandlers.isEmpty())
          {
            handler = (EventHandler)_pendingHandlers.removeFirst();
          }
          else
          {
            select();
          }

          boolean finished = false;
          boolean shutdown = false;

          if(handler == null)
          {
            synchronized(this)
            {
                if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout.
                {
                  if(TRACE_SELECT)
                  {
                      trace("timeout");
                  }
                  
                  assert(_timeout > 0);
                  _timeout = 0;
                  shutdown = true;
                }
                else
                {
                  if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable())
                  {
                      if(TRACE_SELECT || TRACE_INTERRUPT)
                      {
                        trace("detected interrupt");
                      }
                      
                      //
                      // There are two possiblities for an interrupt:
                      //
                      // 1. The thread pool has been destroyed.
                      //
                      // 2. An event handler was registered or unregistered.
                      //
                      
                      //
                      // Thread pool destroyed?
                      //
                      if(_destroyed)
                      {
                        if(TRACE_SHUTDOWN)
                        {
                            trace("destroyed, thread id = " + Thread.currentThread());
                        }
                        
                        //
                        // Don't clear the interrupt fd if
                        // destroyed, so that the other threads
                        // exit as well.
                        //
                        return true;
                      }
                      
                      //
                      // Remove the interrupt channel from the
                      // selected key set.
                      //
                      _keys.remove(_fdIntrReadKey);
                      
                      clearInterrupt();
                      
                      //
                      // An event handler must have been registered
                      // or unregistered.
                      //
                      assert(!_changes.isEmpty());
                      FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
                      
                      if(change.handler != null) // Addition if handler is set.
                      {
                        int op;
                        if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
                        {
                            op = java.nio.channels.SelectionKey.OP_READ;
                        }
                        else
                        {
                            op = java.nio.channels.SelectionKey.OP_ACCEPT;
                        }
                            
                        java.nio.channels.SelectionKey key = null;
                        try
                        {
                            key = change.fd.register(_selector, op, change.handler);
                        }
                        catch(java.nio.channels.ClosedChannelException ex)
                        {
                            assert(false);
                        }
                        _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
                            
                        if(TRACE_REGISTRATION)
                        {
                            trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
                                change.fd);
                        }
                            
                        continue;
                      }
                      else // Removal if handler is not set.
                      {
                        HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
                        assert(pair != null);
                        handler = pair.handler;
                        finished = true;
                        pair.key.cancel();
                            
                        if(TRACE_REGISTRATION)
                        {
                            trace("removed handler (" + handler.getClass().getName() + ") for fd " +
                                change.fd);
                        }
                            
                        // Don't continue; we have to call
                        // finished() on the event handler below,
                        // outside the thread synchronization.
                      }
                  }
                  else
                  {
                      java.nio.channels.SelectionKey key = null;
                      java.util.Iterator iter = _keys.iterator();
                      while(iter.hasNext())
                      {
                        //
                        // Ignore selection keys that have been cancelled
                        //
                        java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)iter.next();
                        iter.remove();
                        if(k.isValid() && key != _fdIntrReadKey)
                        {
                            if(TRACE_SELECT)
                            {
                              trace("found a key: " + keyToString(k));
                            }
                            
                            key = k;
                            break;
                        }
                      }
                      
                      if(key == null)
                      {
                        if(TRACE_SELECT)
                        {
                            trace("didn't find a valid key");
                        }
                        
                        continue;
                      }
                      
                      handler = (EventHandler)key.attachment();
                  }
                }
            }
          }

          //
          // Now we are outside the thread synchronization.
          //
          
          if(shutdown)
          {
            if(TRACE_SHUTDOWN)
            {
                trace("shutdown detected");
            }
            
            //
            // Initiate server shutdown.
            //
            ObjectAdapterFactory factory;
            try
            {
                factory = _instance.objectAdapterFactory();
            }
            catch(Ice.CommunicatorDestroyedException e)
            {
                continue;
            }

            promoteFollower();
            factory.shutdown();

            //
            // No "continue", because we want shutdown to be done in
            // its own thread from this pool. Therefore we called
            // promoteFollower().
            //
          }
          else
          {
            assert(handler != null);

            if(finished)
            {
                //
                // Notify a handler about its removal from
                // the thread pool.
                //
                try
                {
                  handler.finished(this);
                }
                catch(Ice.LocalException ex)
                {
                  java.io.StringWriter sw = new java.io.StringWriter();
                  java.io.PrintWriter pw = new java.io.PrintWriter(sw);
                  ex.printStackTrace(pw);
                  pw.flush();
                  String s = "exception in `" + _prefix + "' while calling finished():\n" +
                      sw.toString() + "\n" + handler.toString();
                  _instance.initializationData().logger.error(s);
                }

                //
                // No "continue", because we want finished() to be
                // called in its own thread from this pool. Note
                // that this means that finished() must call
                // promoteFollower().
                //
            }
            else
            {
                //
                // If the handler is "readable", try to read a
                // message.
                //
                try
                {
                  if(handler.readable())
                  {
                      try
                      {
                        //
                        // If read returns true, the handler has more data for the thread pool
                        // to process.
                        //
                        if(read(handler))
                        {
                            _pendingHandlers.add(handler);
                        }
                      }
                      catch(Ice.TimeoutException ex) // Expected.
                      {
                        continue;
                      }
                      catch(Ice.DatagramLimitException ex) // Expected.
                      {
                        continue;
                      }
                      catch(Ice.SocketException ex)
                      {
                        if(TRACE_EXCEPTION)
                        {
                            trace("informing handler (" + handler.getClass().getName() +
                                ") about exception " + ex);
                            ex.printStackTrace();
                        }
                            
                        handler.exception(ex);
                        continue;
                      }
                      catch(Ice.LocalException ex)
                      {
                        if(handler.datagram())
                        {
                            if(_instance.initializationData().properties.getPropertyAsInt(
                                                            "Ice.Warn.Connections") > 0)
                            {
                              _instance.initializationData().logger.warning(
                                  "datagram connection exception:\n" + ex + "\n" + handler.toString());
                            }
                        }
                        else
                        {
                            if(TRACE_EXCEPTION)
                            {
                              trace("informing handler (" + handler.getClass().getName() +
                                    ") about exception " + ex);
                              ex.printStackTrace();
                            }
                            
                            handler.exception(ex);
                        }
                        continue;
                      }
                        
                      stream.swap(handler._stream);
                      assert(stream.pos() == stream.size());
                  }
                      
                  //
                  // Provide a new message to the handler.
                  //
                  try
                  {
                      handler.message(stream, this);
                  }
                  catch(Ice.LocalException ex)
                  {
                      java.io.StringWriter sw = new java.io.StringWriter();
                      java.io.PrintWriter pw = new java.io.PrintWriter(sw);
                      ex.printStackTrace(pw);
                      pw.flush();
                      String s = "exception in `" + _prefix + "' while calling message():\n" +
                        sw.toString() + "\n" + handler.toString();
                      _instance.initializationData().logger.error(s);
                  }

                  //
                  // No "continue", because we want message() to
                  // be called in its own thread from this
                  // pool. Note that this means that message()
                  // must call promoteFollower().
                  //
                }
                finally
                {
                  stream.reset();
                }
            }
          }

          if(_sizeMax > 1)
          {
            synchronized(this)
            {
                if(!_destroyed)
                {
                  //
                  // First we reap threads that have been
                  // destroyed before.
                  //
                  int sz = _threads.size();
                  assert(_running <= sz);
                  if(_running < sz)
                  {
                      java.util.Iterator i = _threads.iterator();
                      while(i.hasNext())
                      {
                        EventHandlerThread thread = (EventHandlerThread)i.next();

                        if(!thread.isAlive())
                        {
                            try
                            {
                              thread.join();
                              i.remove();
                            }
                            catch(InterruptedException ex)
                            {
                            }
                        }
                      }
                  }
                  
                  //
                  // Now we check if this thread can be destroyed, based
                  // on a load factor.
                  //

                  //
                  // The load factor jumps immediately to the number of
                  // threads that are currently in use, but decays
                  // exponentially if the number of threads in use is
                  // smaller than the load factor. This reflects that we
                  // create threads immediately when they are needed,
                  // but want the number of threads to slowly decline to
                  // the configured minimum.
                  //
                  double inUse = (double)_inUse;
                  if(_load < inUse)
                  {
                      _load = inUse;
                  }
                  else
                  {
                      final double loadFactor = 0.05; // TODO: Configurable?
                      final double oneMinusLoadFactor = 1 - loadFactor;
                      _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
                  }

                  if(_running > _size)
                  {
                      int load = (int)(_load + 0.5);

                      //
                      // We add one to the load factor because one
                      // additional thread is needed for select().
                      //
                      if(load + 1 < _running)
                      {
                        assert(_inUse > 0);
                        --_inUse;
                        
                        assert(_running > 0);
                        --_running;
                        
                        return false;
                      }
                  }
                  
                  assert(_inUse > 0);
                  --_inUse;
                }

                while(!_promote)
                {
                  try
                  {
                      wait();
                  }
                  catch(InterruptedException ex)
                  {
                  }
                }
                
                _promote = false;
            }
            
            if(TRACE_THREAD)
            {
                trace("thread " + Thread.currentThread() + " has the lock");
            }
          }
        }
    }

    private boolean
    read(EventHandler handler)
    {
      boolean moreData = false;

        BasicStream stream = handler._stream;

        if(stream.size() == 0)
        {
            stream.resize(Protocol.headerSize, true);
            stream.pos(0);
        }

        if(stream.pos() != stream.size())
        {
            moreData = handler.read(stream);
            assert(stream.pos() == stream.size());
        }

        int pos = stream.pos();
      if(pos < Protocol.headerSize)
      {
          //
          // This situation is possible for small UDP packets.
          //
          throw new Ice.IllegalMessageSizeException();
      }
        stream.pos(0);
      byte[] m = new byte[4];
      m[0] = stream.readByte();
      m[1] = stream.readByte();
      m[2] = stream.readByte();
      m[3] = stream.readByte();
      if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1]
         || m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
      {
          Ice.BadMagicException ex = new Ice.BadMagicException();
          ex.badMagic = m;
          throw ex;
      }

      byte pMajor = stream.readByte();
      byte pMinor = stream.readByte();
      if(pMajor != Protocol.protocolMajor || pMinor > Protocol.protocolMinor)
      {
          Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException();
          e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor;
          e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor;
          e.major = Protocol.protocolMajor;
          e.minor = Protocol.protocolMinor;
          throw e;
      }

      byte eMajor = stream.readByte();
      byte eMinor = stream.readByte();
      if(eMajor != Protocol.encodingMajor || eMinor > Protocol.encodingMinor)
      {
          Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException();
          e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor;
          e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor;
          e.major = Protocol.encodingMajor;
          e.minor = Protocol.encodingMinor;
          throw e;
      }

        byte messageType = stream.readByte();
        byte compress = stream.readByte();
        int size = stream.readInt();
        if(size < Protocol.headerSize)
        {
            throw new Ice.IllegalMessageSizeException();
        }
        if(size > _instance.messageSizeMax())
        {
            throw new Ice.MemoryLimitException();
        }
        if(size > stream.size())
        {
            stream.resize(size, true);
        }
        stream.pos(pos);

        if(stream.pos() != stream.size())
      {
          if(handler.datagram())
          {
            if(_warnUdp)
            {
                _instance.initializationData().logger.warning("DatagramLimitException: maximum size of "
                                                              + stream.pos() + " exceeded");
            }
            stream.pos(0);
            stream.resize(0, true);
            throw new Ice.DatagramLimitException();
          }
          else
          {
            moreData = handler.read(stream);
            assert(stream.pos() == stream.size());
          }
        }

      return moreData;
    }

/*
 *  Commented out because it is unused.
 *
    private void
    selectNonBlocking()
    {
        while(true)
        {
            try
            {
                if(TRACE_SELECT)
                {
                    trace("non-blocking select on " + _selector.keys().size() + " keys, thread id = " +
                          Thread.currentThread());
                }

                _selector.selectNow();

                if(TRACE_SELECT)
                {
                    if(_keys.size() > 0)
                    {
                        trace("after selectNow, there are " + _keys.size() + " selected keys:");
                        java.util.Iterator i = _keys.iterator();
                        while(i.hasNext())
                        {
                            java.nio.channels.SelectionKey key = (java.nio.channels.SelectionKey)i.next();
                            trace("  " + keyToString(key));
                        }
                    }
                }

                break;
            }
            catch(java.io.InterruptedIOException ex)
            {
                continue;
            }
            catch(java.io.IOException ex)
            {
                //
                // Pressing Ctrl-C causes select() to raise an
                // IOException, which seems like a JDK bug. We trap
                // for that special case here and ignore it.
                // Hopefully we're not masking something important!
                //
                if(ex.getMessage().indexOf("Interrupted system call") != -1)
                {
                    continue;
                }

                Ice.SocketException se = new Ice.SocketException();
                se.initCause(ex);
                //throw se;
            java.io.StringWriter sw = new java.io.StringWriter();
            java.io.PrintWriter pw = new java.io.PrintWriter(sw);
            se.printStackTrace(pw);
            pw.flush();
            String s = "exception in `" + _prefix + "':\n" + sw.toString();
            _instance.initializationData().logger.error(s);
            continue;
            }
        }
    }
*/

    private void
    select()
    {
        int ret = 0;

        while(true)
        {
            try
            {
                if(TRACE_SELECT)
                {
                    trace("select on " + _selector.keys().size() + " keys, thread id = " + Thread.currentThread());
                }

            if(_timeout > 0)
            {
                ret = _selector.select(_timeout * 1000);
            }
            else
            {
                ret = _selector.select();
            }
            }
            catch(java.io.IOException ex)
            {
                //
                // Pressing Ctrl-C causes select() to raise an
                // IOException, which seems like a JDK bug. We trap
                // for that special case here and ignore it.
                // Hopefully we're not masking something important!
                //
                if(Network.interrupted(ex))
                {
                    continue;
                }

                Ice.SocketException se = new Ice.SocketException();
                se.initCause(ex);
                //throw se;
            java.io.StringWriter sw = new java.io.StringWriter();
            java.io.PrintWriter pw = new java.io.PrintWriter(sw);
            se.printStackTrace(pw);
            pw.flush();
            String s = "exception in `" + _prefix + "':\n" + sw.toString();
            _instance.initializationData().logger.error(s);
            continue;
            }

            if(TRACE_SELECT)
            {
                trace("select() returned " + ret + ", _keys.size() = " + _keys.size());
            }

            break;
        }
    }

    private void
    trace(String msg)
    {
        System.err.println(_prefix + ": " + msg);
    }

    private String
    keyToString(java.nio.channels.SelectionKey key)
    {
        String ops = "[";
        if(key.isAcceptable())
        {
            ops += " OP_ACCEPT";
        }
        if(key.isReadable())
        {
            ops += " OP_READ";
        }
        if(key.isConnectable())
        {
            ops += " OP_CONNECT";
        }
        if(key.isWritable())
        {
            ops += " OP_WRITE";
        }
        ops += " ]";
        return key.channel() + " " + ops;
    }

    private static final class FdHandlerPair
    {
        java.nio.channels.SelectableChannel fd;
        EventHandler handler;

        FdHandlerPair(java.nio.channels.SelectableChannel fd, EventHandler handler)
        {
            this.fd = fd;
            this.handler = handler;
        }
    }

    private static final class HandlerKeyPair
    {
        EventHandler handler;
        java.nio.channels.SelectionKey key;

        HandlerKeyPair(EventHandler handler, java.nio.channels.SelectionKey key)
        {
            this.handler = handler;
            this.key = key;
        }
    }

    private Instance _instance;
    private boolean _destroyed;
    private final String _prefix;
    private final String _programNamePrefix;

    private java.nio.channels.ReadableByteChannel _fdIntrRead;
    private java.nio.channels.SelectionKey _fdIntrReadKey;
    private java.nio.channels.WritableByteChannel _fdIntrWrite;
    private java.nio.channels.Selector _selector;
    private java.util.Set _keys;

    private java.util.LinkedList _changes = new java.util.LinkedList();

    private java.util.HashMap _handlerMap = new java.util.HashMap();

    private int _timeout;

    //
    // Since the Java5 SSL transceiver can read more data from the socket than is
    // actually requested, we have to keep a separate list of handlers that need
    // the thread pool to read more data before it re-enters a blocking call to
    // select().
    //
    private java.util.LinkedList _pendingHandlers = new java.util.LinkedList();

    private final class EventHandlerThread extends Thread
    {
        EventHandlerThread(String name)
        {
            super(name);
        }

        public void
        run()
        {
          if(_instance.initializationData().threadHook != null)
          {
              _instance.initializationData().threadHook.start();
          }

            BasicStream stream = new BasicStream(_instance);

          boolean promote;

            try
            {
            promote = ThreadPool.this.run(stream);
            }
            catch(Ice.LocalException ex)
            {
                java.io.StringWriter sw = new java.io.StringWriter();
                java.io.PrintWriter pw = new java.io.PrintWriter(sw);
                ex.printStackTrace(pw);
                pw.flush();
                String s = "exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString();
                _instance.initializationData().logger.error(s);
            promote = true;
            }
            catch(java.lang.Exception ex)
            {
                java.io.StringWriter sw = new java.io.StringWriter();
                java.io.PrintWriter pw = new java.io.PrintWriter(sw);
                ex.printStackTrace(pw);
                pw.flush();
                String s = "unknown exception in `" + _prefix + "' thread " + getName() + ":\n" + sw.toString();
                _instance.initializationData().logger.error(s);
            promote = true;
            }

          if(promote && _sizeMax > 1)
          {
            //
            // Promote a follower, but w/o modifying _inUse or
            // creating new threads.
            //
            synchronized(ThreadPool.this)
            {
                assert(!_promote);
                _promote = true;
                ThreadPool.this.notify();
            }
          }

            if(TRACE_THREAD)
            {
                trace("run() terminated");
            }

          if(_instance.initializationData().threadHook != null)
          {
              _instance.initializationData().threadHook.stop();
          }
        }
    }

    private final int _size; // Number of threads that are pre-created.
    private final int _sizeMax; // Maximum number of threads.
    private final int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.

    private java.util.ArrayList _threads; // All threads, running or not.
    private int _threadIndex; // For assigning thread names.
    private int _running; // Number of running threads.
    private int _inUse; // Number of threads that are currently in use.
    private double _load; // Current load in number of threads.

    private boolean _promote;

    private final boolean _warnUdp;
}

Generated by  Doxygen 1.6.0   Back to index